Skip to content

Commit

Permalink
refactor alerting
Browse files Browse the repository at this point in the history
- use new rabbitmq pubsub code. fixes #34 fixes #27
- remove executor goroutines in favor of just spawing as many
  goroutines as are needed to handle alerting load.
- dont retry failed jobs.  The code that was in place didnt work
  due to the lru cache, retried jobs would just be skipped due to
  being in the cache already. issue #14
  • Loading branch information
woodsaj committed Jun 3, 2016
1 parent d6e8e22 commit a6bbc8a
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 273 deletions.
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/Dieterbe/profiletrigger/heap"
"github.com/grafana/grafana/pkg/log"
"github.com/raintank/met/helper"
//"github.com/raintank/worldping-api/pkg/alerting"
"github.com/raintank/worldping-api/pkg/alerting"
"github.com/raintank/worldping-api/pkg/api"
"github.com/raintank/worldping-api/pkg/cmd"
"github.com/raintank/worldping-api/pkg/events"
Expand Down Expand Up @@ -74,10 +74,10 @@ func main() {
collectoreventpublisher.Init(metricsBackend)

api.InitCollectorController(metricsBackend)
//if setting.AlertingEnabled {
// alerting.Init(metricsBackend)
// alerting.Construct()
//}
if setting.AlertingEnabled {
alerting.Init(metricsBackend)
alerting.Construct()
}

if err := notifications.Init(); err != nil {
log.Fatal(3, "Notification service failed to initialize", err)
Expand Down
88 changes: 29 additions & 59 deletions pkg/alerting/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package alerting

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
Expand All @@ -12,12 +11,11 @@ import (

"github.com/grafana/grafana/pkg/log"
"github.com/hashicorp/golang-lru"
"github.com/raintank/worldping-api/pkg/bus"
"github.com/raintank/worldping-api/pkg/graphite"
m "github.com/raintank/worldping-api/pkg/models"
"github.com/raintank/worldping-api/pkg/services/rabbitmq"
"github.com/raintank/worldping-api/pkg/services/notifications"
"github.com/raintank/worldping-api/pkg/services/sqlstore"
"github.com/raintank/worldping-api/pkg/setting"
"github.com/streadway/amqp"

bgraphite "bosun.org/graphite"
)
Expand All @@ -41,54 +39,25 @@ func GraphiteAuthContextReturner(org_id int64) (bgraphite.Context, error) {
return &ctx, nil
}

func ChanExecutor(fn GraphiteReturner, jobQueue JobQueue, cache *lru.Cache) {
func ChanExecutor(fn GraphiteReturner, jobQueue <-chan *Job, cache *lru.Cache) {
executorNum.Inc(1)
defer executorNum.Dec(1)

realQueue := jobQueue.(internalJobQueue).queue

for job := range realQueue {
jobQueueInternalItems.Value(int64(len(realQueue)))
jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize))
if setting.AlertingInspect {
inspect(fn, job, cache)
} else {
execute(fn, job, cache)
}
}
}

// AmqpExecutor reads jobs from rabbitmq, executes them, and acknowledges them
// if they processed succesfully or encountered a fatal error
// (i.e. an error that we know won't recover on future retries, so no point in retrying)
func AmqpExecutor(fn GraphiteReturner, consumer rabbitmq.Consumer, cache *lru.Cache) {
executorNum.Inc(1)
defer executorNum.Dec(1)
consumer.Consume(func(msg *amqp.Delivery) error {
job := Job{}
if err := json.Unmarshal(msg.Body, &job); err != nil {
log.Error(0, "failed to unmarshal msg body.", err)
return nil
}
var err error
if setting.AlertingInspect {
inspect(GraphiteAuthContextReturner, &job, cache)
} else {
err = execute(GraphiteAuthContextReturner, &job, cache)
}
if err != nil {
if strings.HasPrefix(err.Error(), "fatal:") {
log.Error(0, "%s: removing job from queue", err.Error())
return nil
for j := range jobQueue {
go func(job *Job) {
jobQueueInternalItems.Value(int64(len(jobQueue)))
jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize))
if setting.AlertingInspect {
inspect(fn, job, cache)
} else {
execute(fn, job, cache)
}
log.Error(0, "%s: not acking message. retry later", err.Error())
}
return err
})
}(j)
}
}

func inspect(fn GraphiteReturner, job *Job, cache *lru.Cache) {
key := fmt.Sprintf("%d-%d", job.MonitorId, job.LastPointTs.Unix())
key := fmt.Sprintf("%d-%d", job.CheckId, job.LastPointTs.Unix())
if found, _ := cache.ContainsOrAdd(key, true); found {
//log.Debug("Job %s already done", job)
return
Expand Down Expand Up @@ -116,7 +85,7 @@ func inspect(fn GraphiteReturner, job *Job, cache *lru.Cache) {
// errors are always prefixed with 'non-fatal' (i.e. error condition that imply retrying the job later might fix it)
// or 'fatal', when we're sure the job will never process successfully.
func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error {
key := fmt.Sprintf("%d-%d", job.MonitorId, job.LastPointTs.Unix())
key := fmt.Sprintf("%d-%d", job.CheckId, job.LastPointTs.Unix())

preConsider := time.Now()

Expand Down Expand Up @@ -185,23 +154,24 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error {

if err != nil {
executorAlertOutcomesErr.Inc(1)
return fmt.Errorf("Eval failed for job %q : %s", job, err.Error())
return fmt.Errorf("fatal: eval failed for job %q : %s", job, err.Error())
}

updateMonitorStateCmd := m.UpdateMonitorStateCommand{
Id: job.MonitorId,
checkState := m.CheckState{
Id: job.CheckId,
State: res,
Updated: job.LastPointTs, // this protects against jobs running out of order.
Checked: preExec,
}
if err := bus.Dispatch(&updateMonitorStateCmd); err != nil {
var affected int64
if affected, err = sqlstore.UpdateCheckState(&checkState); err != nil {
//check if we failed due to deadlock.
if err.Error() == "Error 1213: Deadlock found when trying to get lock; try restarting transaction" {
err = bus.Dispatch(&updateMonitorStateCmd)
affected, err = sqlstore.UpdateCheckState(&checkState)
}
}
if err != nil {
return fmt.Errorf("non-fatal: failed to update monitor state: %q", err)
return fmt.Errorf("non-fatal: failed to update check state: %q", err)
}
if gr, ok := gr.(*graphite.GraphiteContext); ok {
requests := ""
Expand All @@ -212,17 +182,17 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error {
resp := bytes.Replace(trace.Response, []byte("\n"), []byte("\n> "), -1)
requests += fmt.Sprintf("\ntargets: %s\nfrom:%s\nto:%s\nresponse:%s\n", r.Targets, r.Start, r.End, resp)
}
log.Debug("Job %s state_change=%t request traces: %s", job, updateMonitorStateCmd.Affected > 0, requests)
log.Debug("Job %s state_change=%t request traces: %s", job, affected > 0, requests)
}
if updateMonitorStateCmd.Affected > 0 && res != m.EvalResultUnknown {
if affected > 0 && res != m.EvalResultUnknown {
//emit a state change event.
if job.Notifications.Enabled {
emails := strings.Split(job.Notifications.Addresses, ",")
if len(emails) < 1 {
log.Debug("no email addresses provided. OrgId: %d monitorId: %d", job.OrgId, job.MonitorId)
log.Debug("no email addresses provided. OrgId: %d monitorId: %d", job.OrgId, job.CheckId)
} else {
for _, email := range emails {
log.Info("sending email. addr=%s, orgId=%d, monitorId=%d, endpointSlug=%s, state=%s", email, job.OrgId, job.MonitorId, job.EndpointSlug, res.String())
log.Info("sending email. addr=%s, orgId=%d, monitorId=%d, endpointSlug=%s, state=%s", email, job.OrgId, job.CheckId, job.EndpointSlug, res.String())
}
sendCmd := m.SendEmailCommand{
To: emails,
Expand All @@ -232,15 +202,15 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error {
"EndpointName": job.EndpointName,
"EndpointSlug": job.EndpointSlug,
"Settings": job.Settings,
"CheckType": job.MonitorTypeName,
"CheckType": job.CheckType,
"State": res.String(),
"TimeLastData": job.LastPointTs, // timestamp of the most recent data used
"TimeExec": preExec, // when we executed the alerting rule and made the determination
},
}

if err := bus.Dispatch(&sendCmd); err != nil {
log.Error(0, "failed to send email to %s. OrgId: %d monitorId: %d due to: %s", emails, job.OrgId, job.MonitorId, err)
if err := notifications.SendEmail(&sendCmd); err != nil {
log.Error(3, "failed to send email to %s. OrgId: %d monitorId: %d due to: %s", emails, job.OrgId, job.CheckId, err)
}
}
}
Expand Down
79 changes: 11 additions & 68 deletions pkg/alerting/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"github.com/grafana/grafana/pkg/log"
"github.com/hashicorp/golang-lru"
"github.com/raintank/met"
"github.com/raintank/worldping-api/pkg/services/rabbitmq"
"github.com/raintank/worldping-api/pkg/setting"
)

var jobQueueInternalItems met.Gauge
var jobQueueInternalSize met.Gauge
var jobQueuePreAMQPItems met.Gauge
var jobQueuePreAMQPSize met.Gauge

var tickQueueItems met.Meter
var tickQueueSize met.Gauge
var dispatcherJobsSkippedDueToSlowJobQueueInternal met.Count
Expand Down Expand Up @@ -56,6 +56,7 @@ func Init(metrics met.Backend) {
jobQueueInternalSize = metrics.NewGauge("alert-jobqueue-internal.size", int64(setting.InternalJobQueueSize))
jobQueuePreAMQPItems = metrics.NewGauge("alert-jobqueue-preamqp.items", 0)
jobQueuePreAMQPSize = metrics.NewGauge("alert-jobqueue-preamqp.size", int64(setting.PreAMQPJobQueueSize))

tickQueueItems = metrics.NewMeter("alert-tickqueue.items", 0)
tickQueueSize = metrics.NewGauge("alert-tickqueue.size", int64(setting.TickQueueSize))
dispatcherJobsSkippedDueToSlowJobQueueInternal = metrics.NewCount("alert-dispatcher.jobs-skipped-due-to-slow-internal-jobqueue")
Expand Down Expand Up @@ -97,80 +98,22 @@ func Construct() {
panic(fmt.Sprintf("Can't create LRU: %s", err.Error()))
}

if setting.AlertingHandler != "amqp" && setting.AlertingHandler != "builtin" {
log.Fatal(0, "alerting handler must be either 'builtin' or 'amqp'")
}
if setting.AlertingHandler == "amqp" {
sec := setting.Cfg.Section("event_publisher")
if !sec.Key("enabled").MustBool(false) {
log.Fatal(0, "alerting handler 'amqp' requires the event_publisher to be enabled")
}
url := sec.Key("rabbitmq_url").String()
if err := distributed(url, cache); err != nil {
log.Fatal(0, "failed to start amqp consumer.", err)
}
return
} else {
if !setting.EnableScheduler {
log.Fatal(0, "Alerting in standalone mode requires a scheduler (enable_scheduler = true)")
}
if setting.Executors == 0 {
log.Fatal(0, "Alerting in standalone mode requires at least 1 executor (try: executors = 10)")
}

standalone(cache)
if !setting.Rabbitmq.Enabled && !setting.EnableScheduler {
log.Fatal(3, "Alerting in standalone mode requires a scheduler (enable_scheduler = true)")
}
}

func standalone(cache *lru.Cache) {
jobQueue := newInternalJobQueue(setting.InternalJobQueueSize)

// create jobs
go Dispatcher(jobQueue)

//start group of workers to execute the checks.
for i := 0; i < setting.Executors; i++ {
go ChanExecutor(GraphiteAuthContextReturner, jobQueue, cache)
}
}
recvJobQueue := make(chan *Job, setting.InternalJobQueueSize)

func distributed(url string, cache *lru.Cache) error {
exchange := "alertingJobs"
exch := rabbitmq.Exchange{
Name: exchange,
ExchangeType: "x-consistent-hash",
Durable: true,
}
InitJobQueue(recvJobQueue)

// create jobs
if setting.EnableScheduler {
publisher := &rabbitmq.Publisher{Url: url, Exchange: &exch}
err := publisher.Connect()
if err != nil {
return err
}

jobQueue := newPreAMQPJobQueue(setting.PreAMQPJobQueueSize, publisher)

go Dispatcher(jobQueue)
go Dispatcher()
}

q := rabbitmq.Queue{
Name: "",
Durable: false,
AutoDelete: true,
Exclusive: true,
}
//start group of workers to execute the checks.
for i := 0; i < setting.Executors; i++ {
consumer := rabbitmq.Consumer{
Url: url,
Exchange: &exch,
Queue: &q,
BindingKey: []string{"10"}, //consistant hashing weight.
}
if err := consumer.Connect(); err != nil {
log.Fatal(0, "failed to start event.consumer.", err)
}
AmqpExecutor(GraphiteAuthContextReturner, consumer, cache)
go ChanExecutor(GraphiteAuthContextReturner, recvJobQueue, cache)
}
return nil

}
68 changes: 66 additions & 2 deletions pkg/alerting/jobqueue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,69 @@
package alerting

type JobQueue interface {
Put(job *Job)
import (
"encoding/json"
"strconv"

"github.com/grafana/grafana/pkg/log"
"github.com/raintank/worldping-api/pkg/alerting/jobqueue"
"github.com/raintank/worldping-api/pkg/setting"
)

var (
pubChan chan jobqueue.Message
subChan chan jobqueue.Message
)

func InitJobQueue(jobQueue chan<- *Job) {

if setting.Rabbitmq.Enabled {
pubChan = make(chan jobqueue.Message, setting.PreAMQPJobQueueSize)
// use rabbitmq for message distribution.

//subchan is unbuffered as the consumer creates a goroutine for
// every message recieved.
subChan = make(chan jobqueue.Message)
go jobqueue.Run(setting.Rabbitmq.Url, "alertingJobs", pubChan, subChan)
go handleJobs(subChan, jobQueue)
} else {
pubChan = make(chan jobqueue.Message, setting.InternalJobQueueSize)
// handle all message written to the publish chan.
go handleJobs(pubChan, jobQueue)
}
return
}

func PublishJob(job *Job) error {
body, err := json.Marshal(job)
if err != nil {
return err
}
msg := jobqueue.Message{
RoutingKey: strconv.FormatInt(job.CheckId, 10),
Payload: body,
}
if setting.Rabbitmq.Enabled {
jobQueuePreAMQPItems.Value(int64(len(pubChan)))
jobQueuePreAMQPSize.Value(int64(setting.PreAMQPJobQueueSize))
} else {
jobQueueInternalItems.Value(int64(len(pubChan)))
jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize))
}

pubChan <- msg
return nil
}

func handleJobs(c chan jobqueue.Message, jobQueue chan<- *Job) {
for m := range c {
go func(msg jobqueue.Message) {
j := &Job{}
err := json.Unmarshal(msg.Payload, j)
if err != nil {
log.Error(3, "unable to unmarshal Job. %s", err)
return
}
jobQueue <- j
}(m)
}
}
Loading

0 comments on commit a6bbc8a

Please sign in to comment.