Skip to content

Commit

Permalink
Merge pull request #14 from lidofinance/route-alerts
Browse files Browse the repository at this point in the history
feat: route alerts by their severities
  • Loading branch information
sergeyWh1te authored Sep 3, 2024
2 parents 17178ec + 7df03fb commit fdf5058
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 270 deletions.
61 changes: 13 additions & 48 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,7 @@ func main() {
app.Metrics.BuildInfo.Inc()
app.RegisterWorkerRoutes(r)

// common
discorderConsumer := `Discorder`
telegrammerConsumer := `Telegrammer`
opsGeniaConsumer := `OpsGeniaer`

// protocol
protocolDiscorderConsumer := `ProtocolDiscorder`
protocolTelegrammerConsumer := `ProtocolTelegrammer`
protocolOpsGeniaConsumer := `ProtocolOpsGeniaer`

// devOps
devOpsDiscorderConsumer := `DevOpsDiscorder`
devOpsTelegrammerConsumer := `DevOpsDiscorder`

_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, discorderConsumer)
_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, telegrammerConsumer)
_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, opsGeniaConsumer)

_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, protocolDiscorderConsumer)
_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, protocolTelegrammerConsumer)
_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, protocolOpsGeniaConsumer)

_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, devOpsDiscorderConsumer)
_ = js.DeleteConsumer(ctx, cfg.AppConfig.NatsStreamName, devOpsTelegrammerConsumer)

_ = js.DeleteStream(ctx, cfg.AppConfig.NatsStreamName)

protocolSubject := fmt.Sprintf(`%s.%s`, cfg.AppConfig.NatsStreamName, teams.Protocol)
devOpsSubject := fmt.Sprintf(`%s.%s`, cfg.AppConfig.NatsStreamName, teams.DevOps)
fallbackSubject := fmt.Sprintf(`%s.%s`, cfg.AppConfig.NatsStreamName, registry.FallBackTeam)

commonStreamName := fmt.Sprintf(`%s_STREAM`, cfg.AppConfig.NatsStreamName)
Expand All @@ -101,8 +73,8 @@ func main() {
MaxAge: 10 * time.Minute,
Subjects: []string{
protocolSubject,
devOpsSubject,
fallbackSubject,
// When you want to set up notifications for your team, you have to add your subject
},
})

Expand All @@ -111,40 +83,33 @@ func main() {
return
}

protocolWorker := worker.NewWorker(
protocolSubject,
stream, log, metricsStore,
worker.WithTelegram(services.Telegram, protocolTelegrammerConsumer),
worker.WithDiscord(services.Discord, protocolDiscorderConsumer),
worker.WithOpsGenia(services.OpsGenia, protocolOpsGeniaConsumer),
const (
Telegram = `Telegram`
Discord = `Discord`
OpsGenie = `OpsGenie`
)

devOpsWorker := worker.NewWorker(
devOpsSubject,
protocolWorker := worker.NewWorker(
protocolSubject,
stream, log, metricsStore,
worker.WithTelegram(services.DevOpsTelegram, devOpsTelegrammerConsumer),
worker.WithDiscord(services.DevOpsDiscord, devOpsDiscorderConsumer),
// worker.WithOpsGenia(services.OpsGenia, `BlackBoxOpsGenia`),
worker.WithConsumer(services.OnChainAlertsTelegram, `OnChainAlerts_Telegram_Consumer`, registry.OnChainAlerts, Telegram),
worker.WithConsumer(services.OnChainUpdatesTelegram, `OnChainUpdates_Telegram_Consumer`, registry.OnChainUpdates, Telegram),
worker.WithConsumer(services.ErrorsTelegram, `Protocol_Errors_Telegram_Consumer`, registry.OnChainErrors, Telegram),
worker.WithConsumer(services.Discord, `Protocol_Discord_Consumer`, registry.FallBackAlerts, Discord),
worker.WithConsumer(services.OpsGenia, `Protocol_OpGenia_Consumer`, registry.OnChainAlerts, OpsGenie),
)

fallbackWorker := worker.NewWorker(
fallbackSubject,
stream, log, metricsStore,
worker.WithTelegram(services.Telegram, telegrammerConsumer),
worker.WithDiscord(services.Discord, discorderConsumer),
worker.WithOpsGenia(services.OpsGenia, opsGeniaConsumer),
worker.WithConsumer(services.Discord, `Fallback_Discord_Consumer`, registry.FallBackAlerts, Discord),
)

if wrkErr := protocolWorker.Run(gCtx, g); wrkErr != nil {
fmt.Println("Could not start protocolWorker error:", wrkErr.Error())
return
}

if devWkrErr := devOpsWorker.Run(gCtx, g); devWkrErr != nil {
fmt.Println("Could not start devOps error:", devWkrErr.Error())
return
}

if wrkErr := fallbackWorker.Run(gCtx, g); wrkErr != nil {
fmt.Println("Could not start fallbackWorker error:", wrkErr.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion example/alerts.http
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Content-Type: application/json
"alertId": "OZ-GNOSIS-EVENTS",
"name": "ethereum-steth-1",
"protocol": "ethereum",
"description": "Manual CRITICAL finding",
"description": "Manual HIGH finding",
"findingType": "UNKNOWN_TYPE",
"severity": "HIGH",
"metadata": {
Expand Down
29 changes: 13 additions & 16 deletions internal/app/server/usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import (
"time"

"github.com/lidofinance/finding-forwarder/internal/connectors/metrics"

"github.com/lidofinance/finding-forwarder/internal/env"
"github.com/lidofinance/finding-forwarder/internal/pkg/notifiler"
)

type Services struct {
Telegram notifiler.Telegram
Discord notifiler.Discord
OpsGenia notifiler.OpsGenia

DevOpsTelegram notifiler.Telegram
DevOpsDiscord notifiler.Discord
OnChainAlertsTelegram notifiler.FindingSender
OnChainUpdatesTelegram notifiler.FindingSender
ErrorsTelegram notifiler.FindingSender
Discord notifiler.FindingSender
OpsGenia notifiler.FindingSender
}

func NewServices(cfg *env.AppConfig, metricsStore *metrics.Store) Services {
Expand All @@ -33,19 +31,18 @@ func NewServices(cfg *env.AppConfig, metricsStore *metrics.Store) Services {
Timeout: 10 * time.Second,
}

telegram := notifiler.NewTelegram(cfg.TelegramBotToken, cfg.TelegramChatID, httpClient, metricsStore, cfg.Source)
devOpsTelegram := notifiler.NewTelegram(cfg.DevOpsTelegramBotToken, cfg.DevOpsTelegramChatID, httpClient, metricsStore, cfg.Source)
alertsTelegram := notifiler.NewTelegram(cfg.TelegramBotToken, cfg.TelegramAlertsChatID, httpClient, metricsStore, cfg.Source)
updatesTelegram := notifiler.NewTelegram(cfg.TelegramBotToken, cfg.TelegramUpdatesChatID, httpClient, metricsStore, cfg.Source)
errorsTelegram := notifiler.NewTelegram(cfg.TelegramBotToken, cfg.TelegramErrorsChatID, httpClient, metricsStore, cfg.Source)

discord := notifiler.NewDiscord(cfg.DiscordWebHookURL, httpClient, metricsStore, cfg.Source)
devOpsDiscord := notifiler.NewDiscord(cfg.DevOpsDiscordWebHookURL, httpClient, metricsStore, cfg.Source)

opsGenia := notifiler.NewOpsGenia(cfg.OpsGeniaAPIKey, httpClient, metricsStore, cfg.Source)

return Services{
Telegram: telegram,
Discord: discord,
OpsGenia: opsGenia,
DevOpsTelegram: devOpsTelegram,
DevOpsDiscord: devOpsDiscord,
OnChainAlertsTelegram: alertsTelegram,
OnChainUpdatesTelegram: updatesTelegram,
ErrorsTelegram: errorsTelegram,
Discord: discord,
OpsGenia: opsGenia,
}
}
137 changes: 21 additions & 116 deletions internal/app/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,14 @@ import (
"github.com/lidofinance/finding-forwarder/generated/forta/models"
"github.com/lidofinance/finding-forwarder/internal/connectors/metrics"
"github.com/lidofinance/finding-forwarder/internal/pkg/notifiler"
"github.com/lidofinance/finding-forwarder/internal/utils/registry"
)

type telegramCarrier struct {
Name string
notifiler notifiler.Telegram
}

type opsGeniaCarrier struct {
Name string
notifiler notifiler.OpsGenia
}

type discordCarrier struct {
Name string
notifiler notifiler.Discord
type carrier struct {
Name string
SeveritySet registry.AlertMapping
notifiler notifiler.FindingSender
channel string
}

type worker struct {
Expand All @@ -37,43 +30,19 @@ type worker struct {
log *slog.Logger
metrics *metrics.Store

telegramConsumers []telegramCarrier
opsGeniaConsumers []opsGeniaCarrier
discordConsumers []discordCarrier
carriers []carrier
}

const (
Telegram = `Telegram`
Discord = `Discord`
OpsGenie = `OpsGenie`
)

// WorkerOptions defines a function type for configuring ServerConfigBuilder.
type WorkerOptions func(worker *worker)

func WithTelegram(telegram notifiler.Telegram, consumerName string) WorkerOptions {
func WithConsumer(notifier notifiler.FindingSender, consumerName string, severitySet registry.AlertMapping, channel string) WorkerOptions {
return func(w *worker) {
w.telegramConsumers = append(w.telegramConsumers, telegramCarrier{
Name: consumerName,
notifiler: telegram,
})
}
}

func WithDiscord(discord notifiler.Discord, consumerName string) WorkerOptions {
return func(w *worker) {
w.discordConsumers = append(w.discordConsumers, discordCarrier{
Name: consumerName,
notifiler: discord,
})
}
}

func WithOpsGenia(opsGenia notifiler.OpsGenia, consumerName string) WorkerOptions {
return func(w *worker) {
w.opsGeniaConsumers = append(w.opsGeniaConsumers, opsGeniaCarrier{
Name: consumerName,
notifiler: opsGenia,
w.carriers = append(w.carriers, carrier{
Name: consumerName,
SeveritySet: severitySet,
notifiler: notifier,
channel: channel,
})
}
}
Expand Down Expand Up @@ -107,100 +76,36 @@ func (w *worker) Run(ctx context.Context, g *errgroup.Group) error {
handler func(msg jetstream.Msg)
}

consumers := make([]Consumer, 0, len(w.telegramConsumers)+len(w.discordConsumers)+len(w.opsGeniaConsumers))
for _, telegramConsumer := range w.telegramConsumers {
consumers := make([]Consumer, 0, len(w.carriers))
for _, consumer := range w.carriers {
consumers = append(consumers, Consumer{
name: telegramConsumer.Name,
name: consumer.Name,
handler: func(msg jetstream.Msg) {
alert := new(models.Alert)

if alertErr := alert.UnmarshalBinary(msg.Data()); alertErr != nil {
w.log.Error(fmt.Sprintf(`Broken message: %v`, alertErr))
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Telegram, metrics.Status: metrics.StatusFail}).Inc()
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: consumer.channel, metrics.Status: metrics.StatusFail}).Inc()
w.terminateMessage(msg)
return
}
defer func() {
alert = nil
}()

if sendErr := telegramConsumer.notifiler.SendMessage(ctx, fmt.Sprintf("%s\n\n%s", alert.Name, alert.Description)); sendErr != nil {
w.log.Error(fmt.Sprintf(`Could not send finding: %v`, sendErr))
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Telegram, metrics.Status: metrics.StatusFail}).Inc()
w.nackMessage(msg)
return
}

w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Telegram, metrics.Status: metrics.StatusOk}).Inc()
w.ackMessage(msg)
},
})
}

for _, opsGeniaConsumer := range w.opsGeniaConsumers {
consumers = append(consumers, Consumer{
name: opsGeniaConsumer.Name,
handler: func(msg jetstream.Msg) {
var alert models.Alert
if alertErr := alert.UnmarshalBinary(msg.Data()); alertErr != nil {
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: OpsGenie, metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf(`Broken message: %v`, alertErr))
w.terminateMessage(msg)
return
}

opsGeniaPriority := ""
switch alert.Severity {
case models.AlertSeverityCRITICAL:
opsGeniaPriority = "P2"
case models.AlertSeverityHIGH:
opsGeniaPriority = "P3"
}

if opsGeniaPriority == "" {
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: OpsGenie, metrics.Status: metrics.StatusOk}).Inc()
if _, ok := consumer.SeveritySet[alert.Severity]; !ok {
w.ackMessage(msg)
return
}

if sendErr := opsGeniaConsumer.notifiler.SendMessage(ctx, alert.Name, alert.Description,
alert.AlertID, opsGeniaPriority); sendErr != nil {
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: OpsGenie, metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf(`Could not send finding to OpsGenia: %s`, sendErr.Error()))
w.nackMessage(msg)
return
}

w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: OpsGenie, metrics.Status: metrics.StatusOk}).Inc()
w.ackMessage(msg)
},
})
}

for _, discordConsumer := range w.discordConsumers {
consumers = append(consumers, Consumer{
name: discordConsumer.Name,
handler: func(msg jetstream.Msg) {
alert := new(models.Alert)

if alertErr := alert.UnmarshalBinary(msg.Data()); alertErr != nil {
w.log.Error(fmt.Sprintf(`Broken message: %v`, alertErr))
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Discord, metrics.Status: metrics.StatusFail}).Inc()
w.terminateMessage(msg)
return
}
defer func() {
alert = nil
}()

if sendErr := discordConsumer.notifiler.SendMessage(ctx, fmt.Sprintf("%s\n\n%s", alert.Name, alert.Description)); sendErr != nil {
if sendErr := consumer.notifiler.SendFinding(ctx, alert); sendErr != nil {
w.log.Error(fmt.Sprintf(`Could not send finding: %v`, sendErr))
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Discord, metrics.Status: metrics.StatusFail}).Inc()
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: consumer.channel, metrics.Status: metrics.StatusFail}).Inc()
w.nackMessage(msg)
return
}

w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: Discord, metrics.Status: metrics.StatusOk}).Inc()
w.metrics.SentAlerts.With(prometheus.Labels{metrics.Channel: consumer.channel, metrics.Status: metrics.StatusOk}).Inc()
w.ackMessage(msg)
},
})
Expand Down
Loading

0 comments on commit fdf5058

Please sign in to comment.