From a6bbc8a7992d76c14fd64fbb1a890ab18bdbb65a Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 3 Jun 2016 08:41:38 +0800 Subject: [PATCH] refactor alerting - use new rabbitmq pubsub code. fixes #34 fixes #27 - remove executor goroutines in favor of just spawing as many goroutines as are needed to handle alerting load. - dont retry failed jobs. The code that was in place didnt work due to the lru cache, retried jobs would just be skipped due to being in the cache already. issue #14 --- main.go | 10 ++-- pkg/alerting/executor.go | 88 ++++++++++--------------------- pkg/alerting/init.go | 79 ++++----------------------- pkg/alerting/jobqueue.go | 68 +++++++++++++++++++++++- pkg/alerting/jobqueue_internal.go | 24 --------- pkg/alerting/jobqueue_preamqp.go | 49 ----------------- pkg/alerting/offset.go | 20 +++---- pkg/alerting/schedule.go | 85 ++++++++++++++--------------- pkg/alerting/scheduler.go | 8 +-- pkg/models/endpoint.go | 35 ++++++++++-- pkg/services/sqlstore/endpoint.go | 82 ++++++++++++++++++++++++++++ 11 files changed, 275 insertions(+), 273 deletions(-) delete mode 100644 pkg/alerting/jobqueue_internal.go delete mode 100644 pkg/alerting/jobqueue_preamqp.go diff --git a/main.go b/main.go index cc4958f0..8f56f191 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( "github.com/Dieterbe/profiletrigger/heap" "github.com/grafana/grafana/pkg/log" "github.com/raintank/met/helper" - //"github.com/raintank/worldping-api/pkg/alerting" + "github.com/raintank/worldping-api/pkg/alerting" "github.com/raintank/worldping-api/pkg/api" "github.com/raintank/worldping-api/pkg/cmd" "github.com/raintank/worldping-api/pkg/events" @@ -74,10 +74,10 @@ func main() { collectoreventpublisher.Init(metricsBackend) api.InitCollectorController(metricsBackend) - //if setting.AlertingEnabled { - // alerting.Init(metricsBackend) - // alerting.Construct() - //} + if setting.AlertingEnabled { + alerting.Init(metricsBackend) + alerting.Construct() + } if err := notifications.Init(); err != nil { log.Fatal(3, "Notification service failed to initialize", err) diff --git a/pkg/alerting/executor.go b/pkg/alerting/executor.go index 586994f2..b50b4b32 100644 --- a/pkg/alerting/executor.go +++ b/pkg/alerting/executor.go @@ -2,7 +2,6 @@ package alerting import ( "bytes" - "encoding/json" "fmt" "net/http" "net/url" @@ -12,12 +11,11 @@ import ( "github.com/grafana/grafana/pkg/log" "github.com/hashicorp/golang-lru" - "github.com/raintank/worldping-api/pkg/bus" "github.com/raintank/worldping-api/pkg/graphite" m "github.com/raintank/worldping-api/pkg/models" - "github.com/raintank/worldping-api/pkg/services/rabbitmq" + "github.com/raintank/worldping-api/pkg/services/notifications" + "github.com/raintank/worldping-api/pkg/services/sqlstore" "github.com/raintank/worldping-api/pkg/setting" - "github.com/streadway/amqp" bgraphite "bosun.org/graphite" ) @@ -41,54 +39,25 @@ func GraphiteAuthContextReturner(org_id int64) (bgraphite.Context, error) { return &ctx, nil } -func ChanExecutor(fn GraphiteReturner, jobQueue JobQueue, cache *lru.Cache) { +func ChanExecutor(fn GraphiteReturner, jobQueue <-chan *Job, cache *lru.Cache) { executorNum.Inc(1) defer executorNum.Dec(1) - realQueue := jobQueue.(internalJobQueue).queue - - for job := range realQueue { - jobQueueInternalItems.Value(int64(len(realQueue))) - jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize)) - if setting.AlertingInspect { - inspect(fn, job, cache) - } else { - execute(fn, job, cache) - } - } -} - -// AmqpExecutor reads jobs from rabbitmq, executes them, and acknowledges them -// if they processed succesfully or encountered a fatal error -// (i.e. an error that we know won't recover on future retries, so no point in retrying) -func AmqpExecutor(fn GraphiteReturner, consumer rabbitmq.Consumer, cache *lru.Cache) { - executorNum.Inc(1) - defer executorNum.Dec(1) - consumer.Consume(func(msg *amqp.Delivery) error { - job := Job{} - if err := json.Unmarshal(msg.Body, &job); err != nil { - log.Error(0, "failed to unmarshal msg body.", err) - return nil - } - var err error - if setting.AlertingInspect { - inspect(GraphiteAuthContextReturner, &job, cache) - } else { - err = execute(GraphiteAuthContextReturner, &job, cache) - } - if err != nil { - if strings.HasPrefix(err.Error(), "fatal:") { - log.Error(0, "%s: removing job from queue", err.Error()) - return nil + for j := range jobQueue { + go func(job *Job) { + jobQueueInternalItems.Value(int64(len(jobQueue))) + jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize)) + if setting.AlertingInspect { + inspect(fn, job, cache) + } else { + execute(fn, job, cache) } - log.Error(0, "%s: not acking message. retry later", err.Error()) - } - return err - }) + }(j) + } } func inspect(fn GraphiteReturner, job *Job, cache *lru.Cache) { - key := fmt.Sprintf("%d-%d", job.MonitorId, job.LastPointTs.Unix()) + key := fmt.Sprintf("%d-%d", job.CheckId, job.LastPointTs.Unix()) if found, _ := cache.ContainsOrAdd(key, true); found { //log.Debug("Job %s already done", job) return @@ -116,7 +85,7 @@ func inspect(fn GraphiteReturner, job *Job, cache *lru.Cache) { // errors are always prefixed with 'non-fatal' (i.e. error condition that imply retrying the job later might fix it) // or 'fatal', when we're sure the job will never process successfully. func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error { - key := fmt.Sprintf("%d-%d", job.MonitorId, job.LastPointTs.Unix()) + key := fmt.Sprintf("%d-%d", job.CheckId, job.LastPointTs.Unix()) preConsider := time.Now() @@ -185,23 +154,24 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error { if err != nil { executorAlertOutcomesErr.Inc(1) - return fmt.Errorf("Eval failed for job %q : %s", job, err.Error()) + return fmt.Errorf("fatal: eval failed for job %q : %s", job, err.Error()) } - updateMonitorStateCmd := m.UpdateMonitorStateCommand{ - Id: job.MonitorId, + checkState := m.CheckState{ + Id: job.CheckId, State: res, Updated: job.LastPointTs, // this protects against jobs running out of order. Checked: preExec, } - if err := bus.Dispatch(&updateMonitorStateCmd); err != nil { + var affected int64 + if affected, err = sqlstore.UpdateCheckState(&checkState); err != nil { //check if we failed due to deadlock. if err.Error() == "Error 1213: Deadlock found when trying to get lock; try restarting transaction" { - err = bus.Dispatch(&updateMonitorStateCmd) + affected, err = sqlstore.UpdateCheckState(&checkState) } } if err != nil { - return fmt.Errorf("non-fatal: failed to update monitor state: %q", err) + return fmt.Errorf("non-fatal: failed to update check state: %q", err) } if gr, ok := gr.(*graphite.GraphiteContext); ok { requests := "" @@ -212,17 +182,17 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error { resp := bytes.Replace(trace.Response, []byte("\n"), []byte("\n> "), -1) requests += fmt.Sprintf("\ntargets: %s\nfrom:%s\nto:%s\nresponse:%s\n", r.Targets, r.Start, r.End, resp) } - log.Debug("Job %s state_change=%t request traces: %s", job, updateMonitorStateCmd.Affected > 0, requests) + log.Debug("Job %s state_change=%t request traces: %s", job, affected > 0, requests) } - if updateMonitorStateCmd.Affected > 0 && res != m.EvalResultUnknown { + if affected > 0 && res != m.EvalResultUnknown { //emit a state change event. if job.Notifications.Enabled { emails := strings.Split(job.Notifications.Addresses, ",") if len(emails) < 1 { - log.Debug("no email addresses provided. OrgId: %d monitorId: %d", job.OrgId, job.MonitorId) + log.Debug("no email addresses provided. OrgId: %d monitorId: %d", job.OrgId, job.CheckId) } else { for _, email := range emails { - log.Info("sending email. addr=%s, orgId=%d, monitorId=%d, endpointSlug=%s, state=%s", email, job.OrgId, job.MonitorId, job.EndpointSlug, res.String()) + log.Info("sending email. addr=%s, orgId=%d, monitorId=%d, endpointSlug=%s, state=%s", email, job.OrgId, job.CheckId, job.EndpointSlug, res.String()) } sendCmd := m.SendEmailCommand{ To: emails, @@ -232,15 +202,15 @@ func execute(fn GraphiteReturner, job *Job, cache *lru.Cache) error { "EndpointName": job.EndpointName, "EndpointSlug": job.EndpointSlug, "Settings": job.Settings, - "CheckType": job.MonitorTypeName, + "CheckType": job.CheckType, "State": res.String(), "TimeLastData": job.LastPointTs, // timestamp of the most recent data used "TimeExec": preExec, // when we executed the alerting rule and made the determination }, } - if err := bus.Dispatch(&sendCmd); err != nil { - log.Error(0, "failed to send email to %s. OrgId: %d monitorId: %d due to: %s", emails, job.OrgId, job.MonitorId, err) + if err := notifications.SendEmail(&sendCmd); err != nil { + log.Error(3, "failed to send email to %s. OrgId: %d monitorId: %d due to: %s", emails, job.OrgId, job.CheckId, err) } } } diff --git a/pkg/alerting/init.go b/pkg/alerting/init.go index 6817a6d5..024e8d56 100644 --- a/pkg/alerting/init.go +++ b/pkg/alerting/init.go @@ -7,7 +7,6 @@ import ( "github.com/grafana/grafana/pkg/log" "github.com/hashicorp/golang-lru" "github.com/raintank/met" - "github.com/raintank/worldping-api/pkg/services/rabbitmq" "github.com/raintank/worldping-api/pkg/setting" ) @@ -15,6 +14,7 @@ var jobQueueInternalItems met.Gauge var jobQueueInternalSize met.Gauge var jobQueuePreAMQPItems met.Gauge var jobQueuePreAMQPSize met.Gauge + var tickQueueItems met.Meter var tickQueueSize met.Gauge var dispatcherJobsSkippedDueToSlowJobQueueInternal met.Count @@ -56,6 +56,7 @@ func Init(metrics met.Backend) { jobQueueInternalSize = metrics.NewGauge("alert-jobqueue-internal.size", int64(setting.InternalJobQueueSize)) jobQueuePreAMQPItems = metrics.NewGauge("alert-jobqueue-preamqp.items", 0) jobQueuePreAMQPSize = metrics.NewGauge("alert-jobqueue-preamqp.size", int64(setting.PreAMQPJobQueueSize)) + tickQueueItems = metrics.NewMeter("alert-tickqueue.items", 0) tickQueueSize = metrics.NewGauge("alert-tickqueue.size", int64(setting.TickQueueSize)) dispatcherJobsSkippedDueToSlowJobQueueInternal = metrics.NewCount("alert-dispatcher.jobs-skipped-due-to-slow-internal-jobqueue") @@ -97,80 +98,22 @@ func Construct() { panic(fmt.Sprintf("Can't create LRU: %s", err.Error())) } - if setting.AlertingHandler != "amqp" && setting.AlertingHandler != "builtin" { - log.Fatal(0, "alerting handler must be either 'builtin' or 'amqp'") - } - if setting.AlertingHandler == "amqp" { - sec := setting.Cfg.Section("event_publisher") - if !sec.Key("enabled").MustBool(false) { - log.Fatal(0, "alerting handler 'amqp' requires the event_publisher to be enabled") - } - url := sec.Key("rabbitmq_url").String() - if err := distributed(url, cache); err != nil { - log.Fatal(0, "failed to start amqp consumer.", err) - } - return - } else { - if !setting.EnableScheduler { - log.Fatal(0, "Alerting in standalone mode requires a scheduler (enable_scheduler = true)") - } - if setting.Executors == 0 { - log.Fatal(0, "Alerting in standalone mode requires at least 1 executor (try: executors = 10)") - } - - standalone(cache) + if !setting.Rabbitmq.Enabled && !setting.EnableScheduler { + log.Fatal(3, "Alerting in standalone mode requires a scheduler (enable_scheduler = true)") } -} - -func standalone(cache *lru.Cache) { - jobQueue := newInternalJobQueue(setting.InternalJobQueueSize) - // create jobs - go Dispatcher(jobQueue) - - //start group of workers to execute the checks. - for i := 0; i < setting.Executors; i++ { - go ChanExecutor(GraphiteAuthContextReturner, jobQueue, cache) - } -} + recvJobQueue := make(chan *Job, setting.InternalJobQueueSize) -func distributed(url string, cache *lru.Cache) error { - exchange := "alertingJobs" - exch := rabbitmq.Exchange{ - Name: exchange, - ExchangeType: "x-consistent-hash", - Durable: true, - } + InitJobQueue(recvJobQueue) + // create jobs if setting.EnableScheduler { - publisher := &rabbitmq.Publisher{Url: url, Exchange: &exch} - err := publisher.Connect() - if err != nil { - return err - } - - jobQueue := newPreAMQPJobQueue(setting.PreAMQPJobQueueSize, publisher) - - go Dispatcher(jobQueue) + go Dispatcher() } - q := rabbitmq.Queue{ - Name: "", - Durable: false, - AutoDelete: true, - Exclusive: true, - } + //start group of workers to execute the checks. for i := 0; i < setting.Executors; i++ { - consumer := rabbitmq.Consumer{ - Url: url, - Exchange: &exch, - Queue: &q, - BindingKey: []string{"10"}, //consistant hashing weight. - } - if err := consumer.Connect(); err != nil { - log.Fatal(0, "failed to start event.consumer.", err) - } - AmqpExecutor(GraphiteAuthContextReturner, consumer, cache) + go ChanExecutor(GraphiteAuthContextReturner, recvJobQueue, cache) } - return nil + } diff --git a/pkg/alerting/jobqueue.go b/pkg/alerting/jobqueue.go index 03739b56..2d15f3ec 100644 --- a/pkg/alerting/jobqueue.go +++ b/pkg/alerting/jobqueue.go @@ -1,5 +1,69 @@ package alerting -type JobQueue interface { - Put(job *Job) +import ( + "encoding/json" + "strconv" + + "github.com/grafana/grafana/pkg/log" + "github.com/raintank/worldping-api/pkg/alerting/jobqueue" + "github.com/raintank/worldping-api/pkg/setting" +) + +var ( + pubChan chan jobqueue.Message + subChan chan jobqueue.Message +) + +func InitJobQueue(jobQueue chan<- *Job) { + + if setting.Rabbitmq.Enabled { + pubChan = make(chan jobqueue.Message, setting.PreAMQPJobQueueSize) + // use rabbitmq for message distribution. + + //subchan is unbuffered as the consumer creates a goroutine for + // every message recieved. + subChan = make(chan jobqueue.Message) + go jobqueue.Run(setting.Rabbitmq.Url, "alertingJobs", pubChan, subChan) + go handleJobs(subChan, jobQueue) + } else { + pubChan = make(chan jobqueue.Message, setting.InternalJobQueueSize) + // handle all message written to the publish chan. + go handleJobs(pubChan, jobQueue) + } + return +} + +func PublishJob(job *Job) error { + body, err := json.Marshal(job) + if err != nil { + return err + } + msg := jobqueue.Message{ + RoutingKey: strconv.FormatInt(job.CheckId, 10), + Payload: body, + } + if setting.Rabbitmq.Enabled { + jobQueuePreAMQPItems.Value(int64(len(pubChan))) + jobQueuePreAMQPSize.Value(int64(setting.PreAMQPJobQueueSize)) + } else { + jobQueueInternalItems.Value(int64(len(pubChan))) + jobQueueInternalSize.Value(int64(setting.InternalJobQueueSize)) + } + + pubChan <- msg + return nil +} + +func handleJobs(c chan jobqueue.Message, jobQueue chan<- *Job) { + for m := range c { + go func(msg jobqueue.Message) { + j := &Job{} + err := json.Unmarshal(msg.Payload, j) + if err != nil { + log.Error(3, "unable to unmarshal Job. %s", err) + return + } + jobQueue <- j + }(m) + } } diff --git a/pkg/alerting/jobqueue_internal.go b/pkg/alerting/jobqueue_internal.go deleted file mode 100644 index 41ccb9c2..00000000 --- a/pkg/alerting/jobqueue_internal.go +++ /dev/null @@ -1,24 +0,0 @@ -package alerting - -type internalJobQueue struct { - size int - queue chan *Job -} - -func newInternalJobQueue(size int) internalJobQueue { - return internalJobQueue{ - size, - make(chan *Job, size), - } -} - -func (jq internalJobQueue) Put(job *Job) { - jobQueueInternalItems.Value(int64(len(jq.queue))) - jobQueueInternalSize.Value(int64(jq.size)) - - select { - case jq.queue <- job: - default: - dispatcherJobsSkippedDueToSlowJobQueueInternal.Inc(1) - } -} diff --git a/pkg/alerting/jobqueue_preamqp.go b/pkg/alerting/jobqueue_preamqp.go deleted file mode 100644 index 0fd5b237..00000000 --- a/pkg/alerting/jobqueue_preamqp.go +++ /dev/null @@ -1,49 +0,0 @@ -package alerting - -import ( - "encoding/json" - "fmt" - - "github.com/grafana/grafana/pkg/log" - "github.com/raintank/worldping-api/pkg/services/rabbitmq" -) - -type PreAMQPJobQueue struct { - size int - queue chan *Job - publisher *rabbitmq.Publisher -} - -func newPreAMQPJobQueue(size int, publisher *rabbitmq.Publisher) PreAMQPJobQueue { - jq := PreAMQPJobQueue{ - size, - make(chan *Job, size), - publisher, - } - go jq.run() - return jq -} - -//send dispatched jobs to rabbitmq. -func (jq PreAMQPJobQueue) run() { - for job := range jq.queue { - routingKey := fmt.Sprintf("%d", job.MonitorId) - msg, err := json.Marshal(job) - if err != nil { - log.Error(3, "failed to marshal job to json: %s", err) - continue - } - jq.publisher.Publish(routingKey, msg) - } -} - -func (jq PreAMQPJobQueue) Put(job *Job) { - jobQueuePreAMQPItems.Value(int64(len(jq.queue))) - jobQueuePreAMQPSize.Value(int64(jq.size)) - - select { - case jq.queue <- job: - default: - dispatcherJobsSkippedDueToSlowJobQueuePreAMQP.Inc(1) - } -} diff --git a/pkg/alerting/offset.go b/pkg/alerting/offset.go index 4d88ebbf..2234048c 100644 --- a/pkg/alerting/offset.go +++ b/pkg/alerting/offset.go @@ -5,36 +5,28 @@ import ( "strconv" "github.com/grafana/grafana/pkg/log" - "github.com/raintank/worldping-api/pkg/bus" - m "github.com/raintank/worldping-api/pkg/models" + "github.com/raintank/worldping-api/pkg/services/sqlstore" ) func LoadOrSetOffset() int { - query := m.GetAlertSchedulerValueQuery{ - Id: "offset", - } - err := bus.Dispatch(&query) + offset, err := sqlstore.GetAlertSchedulerValue("offset") if err != nil { panic(fmt.Sprintf("failure querying for current offset: %q", err)) } - if query.Result == "" { + if offset == "" { log.Debug("initializing offset to default value of 30 seconds.") setOffset(30) return 30 } - i, err := strconv.Atoi(query.Result) + i, err := strconv.Atoi(offset) if err != nil { - panic(fmt.Sprintf("failure reading in offset: %q. input value was: %q", err, query.Result)) + panic(fmt.Sprintf("failure reading in offset: %q. input value was: %q", err, offset)) } return i } func setOffset(offset int) { - update := m.UpdateAlertSchedulerValueCommand{ - Id: "offset", - Value: fmt.Sprintf("%d", offset), - } - err := bus.Dispatch(&update) + err := sqlstore.UpdateAlertSchedulerValue("offset", fmt.Sprintf("%d", offset)) if err != nil { log.Error(0, "Could not persist offset: %q", err) } diff --git a/pkg/alerting/schedule.go b/pkg/alerting/schedule.go index 38104a73..4cd4f22d 100644 --- a/pkg/alerting/schedule.go +++ b/pkg/alerting/schedule.go @@ -8,9 +8,9 @@ import ( "time" "github.com/raintank/raintank-metric/schema" - "github.com/raintank/worldping-api/pkg/bus" m "github.com/raintank/worldping-api/pkg/models" "github.com/raintank/worldping-api/pkg/services/metricpublisher" + "github.com/raintank/worldping-api/pkg/services/sqlstore" "github.com/raintank/worldping-api/pkg/setting" ) @@ -20,13 +20,13 @@ import ( // that said, for convenience, we track the generatedAt timestamp type Job struct { OrgId int64 - MonitorId int64 + CheckId int64 EndpointId int64 EndpointName string EndpointSlug string - Settings map[string]string - MonitorTypeName string - Notifications m.MonitorNotificationSetting + Settings map[string]interface{} + CheckType string + Notifications m.CheckNotificationSetting Freq int64 Offset int64 // offset on top of "even" minute/10s/.. intervals Definition CheckDef @@ -39,7 +39,7 @@ type Job struct { } func (job Job) String() string { - return fmt.Sprintf(" monitorId=%d generatedAt=%s lastPointTs=%s definition: %s", job.MonitorId, job.GeneratedAt, job.LastPointTs, job.Definition) + return fmt.Sprintf(" checkId=%d generatedAt=%s lastPointTs=%s definition: %s", job.CheckId, job.GeneratedAt, job.LastPointTs, job.Definition) } func (job Job) StoreResult(res m.CheckEvalResult) { @@ -51,8 +51,8 @@ func (job Job) StoreResult(res m.CheckEvalResult) { for pos, state := range metricNames { metrics[pos] = &schema.MetricData{ OrgId: int(job.OrgId), - Name: fmt.Sprintf("health.%s.%s.%s", job.EndpointSlug, strings.ToLower(job.MonitorTypeName), state), - Metric: fmt.Sprintf("health.%s.%s", strings.ToLower(job.MonitorTypeName), state), + Name: fmt.Sprintf("health.%s.%s.%s", job.EndpointSlug, strings.ToLower(job.CheckType), state), + Metric: fmt.Sprintf("health.%s.%s", strings.ToLower(job.CheckType), state), Interval: int(job.Freq), Value: 0.0, Unit: "state", @@ -60,7 +60,7 @@ func (job Job) StoreResult(res m.CheckEvalResult) { TargetType: "gauge", Tags: []string{ fmt.Sprintf("endpoint_id:%d", job.EndpointId), - fmt.Sprintf("monitor_id:%d", job.MonitorId), + fmt.Sprintf("monitor_id:%d", job.CheckId), }, } metrics[pos].SetId() @@ -79,17 +79,14 @@ func (job *Job) assertStart() { // getJobs retrieves all jobs for which lastPointAt % their freq == their offset. func getJobs(lastPointAt int64) ([]*Job, error) { - query := m.GetMonitorsForAlertsQuery{ - Timestamp: lastPointAt, - } - - if err := bus.Dispatch(&query); err != nil { + checks, err := sqlstore.GetChecksForAlerts(lastPointAt) + if err != nil { return nil, err } jobs := make([]*Job, 0) - for _, monitor := range query.Result { - job := buildJobForMonitor(monitor) + for _, monitor := range checks { + job := buildJobForMonitor(&monitor) if job != nil { jobs = append(jobs, job) } @@ -98,24 +95,24 @@ func getJobs(lastPointAt int64) ([]*Job, error) { return jobs, nil } -func buildJobForMonitor(monitor *m.MonitorForAlertDTO) *Job { +func buildJobForMonitor(check *m.CheckForAlertDTO) *Job { //state could in theory be ok, warn, error, but we only use ok vs error for now - if monitor.HealthSettings == nil { + if check.HealthSettings == nil { return nil } - if monitor.Frequency == 0 || monitor.HealthSettings.Steps == 0 || monitor.HealthSettings.NumCollectors == 0 { + if check.Frequency == 0 || check.HealthSettings.Steps == 0 || check.HealthSettings.NumProbes == 0 { //fmt.Printf("bad monitor definition given: %#v", monitor) return nil } type Settings struct { - EndpointSlug string - MonitorTypeName string - Duration string - NumCollectors int - Steps int + EndpointSlug string + CheckType string + Duration string + NumProbes int + Steps int } // graphite behaves like so: @@ -126,11 +123,11 @@ func buildJobForMonitor(monitor *m.MonitorForAlertDTO) *Job { // we can just query from 970 settings := Settings{ - EndpointSlug: monitor.EndpointSlug, - MonitorTypeName: monitor.MonitorTypeName, - Duration: fmt.Sprintf("%d", int64(monitor.HealthSettings.Steps)*monitor.Frequency), - NumCollectors: monitor.HealthSettings.NumCollectors, - Steps: monitor.HealthSettings.Steps, + EndpointSlug: check.Slug, + CheckType: string(check.Type), + Duration: fmt.Sprintf("%d", int64(check.HealthSettings.Steps)*check.Frequency), + NumProbes: check.HealthSettings.NumProbes, + Steps: check.HealthSettings.Steps, } funcMap := template.FuncMap{ @@ -147,8 +144,8 @@ func buildJobForMonitor(monitor *m.MonitorForAlertDTO) *Job { // note: it may look like the end of the queried interval is ambiguous here, and if offset > frequency, may include "too recent" values by accident. // fear not, as when we execute the alert in the executor, we set the lastPointTs as end time - target := `litmus.{{.EndpointSlug}}.*.{{.MonitorTypeName | ToLower }}.error_state` - tpl := `sum(t(streak(graphite("` + target + `", "{{.Duration}}s", "", "")) == {{.Steps}} , "")) >= {{.NumCollectors}}` + target := `litmus.{{.EndpointSlug}}.*.{{.CheckType | ToLower }}.error_state` + tpl := `sum(t(streak(graphite("` + target + `", "{{.Duration}}s", "", "")) == {{.Steps}} , "")) >= {{.NumProbes}}` var t = template.Must(template.New("query").Funcs(funcMap).Parse(tpl)) var b bytes.Buffer @@ -157,23 +154,23 @@ func buildJobForMonitor(monitor *m.MonitorForAlertDTO) *Job { panic(fmt.Sprintf("Could not execute alert query template: %q", err)) } j := &Job{ - MonitorId: monitor.Id, - EndpointId: monitor.EndpointId, - EndpointName: monitor.EndpointName, - EndpointSlug: monitor.EndpointSlug, - Settings: monitor.SettingsMap(), - MonitorTypeName: monitor.MonitorTypeName, - Notifications: monitor.HealthSettings.Notifications, - OrgId: monitor.OrgId, - Freq: monitor.Frequency, - Offset: monitor.Offset, + CheckId: check.Id, + EndpointId: check.EndpointId, + EndpointName: check.Name, + EndpointSlug: check.Slug, + Settings: check.Settings, + CheckType: string(check.Type), + Notifications: check.HealthSettings.Notifications, + OrgId: check.OrgId, + Freq: check.Frequency, + Offset: check.Offset, Definition: CheckDef{ CritExpr: b.String(), WarnExpr: "0", // for now we have only good or bad. so only crit is needed }, - AssertMinSeries: monitor.HealthSettings.NumCollectors, - AssertStep: int(monitor.Frequency), - AssertSteps: monitor.HealthSettings.Steps, + AssertMinSeries: check.HealthSettings.NumProbes, + AssertStep: int(check.Frequency), + AssertSteps: check.HealthSettings.Steps, } return j } diff --git a/pkg/alerting/scheduler.go b/pkg/alerting/scheduler.go index db4f3ec6..b59c3272 100644 --- a/pkg/alerting/scheduler.go +++ b/pkg/alerting/scheduler.go @@ -15,8 +15,8 @@ var tickQueue = make(chan time.Time, setting.TickQueueSize) // Dispatcher dispatches, every second, all jobs that should run for that second // every job has an id so that you can run multiple dispatchers (for HA) while still only processing each job once. // (provided jobs get consistently routed to executors) -func Dispatcher(jobQueue JobQueue) { - go dispatchJobs(jobQueue) +func Dispatcher() { + go dispatchJobs() offset := time.Duration(LoadOrSetOffset()) * time.Second // no need to try resuming where we left off in the past. // see https://github.com/raintank/grafana/issues/266 @@ -52,7 +52,7 @@ func Dispatcher(jobQueue JobQueue) { } } -func dispatchJobs(jobQueue JobQueue) { +func dispatchJobs() { for lastPointAt := range tickQueue { tickQueueItems.Value(int64(len(tickQueue))) tickQueueSize.Value(int64(setting.TickQueueSize)) @@ -73,7 +73,7 @@ func dispatchJobs(jobQueue JobQueue) { job.LastPointTs = lastPointAt job.assertStart() - jobQueue.Put(job) + PublishJob(job) dispatcherJobsScheduled.Inc(1) } diff --git a/pkg/models/endpoint.go b/pkg/models/endpoint.go index 7820902e..6dec844f 100644 --- a/pkg/models/endpoint.go +++ b/pkg/models/endpoint.go @@ -77,12 +77,12 @@ type Check struct { } type CheckHealthSettings struct { - NumProbes int `json:"num_collectors" binding:"Required"` - Steps int `json:"steps" binding:"Required"` - Notifications MonitorNotificationSetting `json:"notifications"` + NumProbes int `json:"num_collectors" binding:"Required"` + Steps int `json:"steps" binding:"Required"` + Notifications CheckNotificationSetting `json:"notifications"` } -type MonitorNotificationSetting struct { +type CheckNotificationSetting struct { Enabled bool `json:"enabled"` Addresses string `json:"addresses"` } @@ -199,3 +199,30 @@ type GetEndpointsQuery struct { Limit int `form:"limit", binding:"Range(0,100)"` Page int `form:"page"` } + +//Alerting + +type CheckState struct { + Id int64 + State CheckEvalResult + Updated time.Time // this protects against jobs running out of order. + Checked time.Time +} + +type CheckForAlertDTO struct { + Id int64 + OrgId int64 + EndpointId int64 + Slug string + Name string + Type string + Offset int64 + Frequency int64 + Enabled bool + StateChange time.Time + StateCheck time.Time + Settings map[string]interface{} `xorm:"JSON"` + HealthSettings *CheckHealthSettings `xorm:"JSON"` + Created time.Time + Updated time.Time +} diff --git a/pkg/services/sqlstore/endpoint.go b/pkg/services/sqlstore/endpoint.go index 79fc0609..a14e8fd3 100644 --- a/pkg/services/sqlstore/endpoint.go +++ b/pkg/services/sqlstore/endpoint.go @@ -22,6 +22,20 @@ func (endpointRows) TableName() string { return "endpoint" } +// scrutinizeState fixes the state. We can't just trust what the database says, we have to verify that the value actually has been updated recently. +// we can simply do this by requiring that the value has been updated since 2*frequency ago. +func scrutinizeState(now time.Time, monitor *m.Check) { + if monitor.State == m.EvalResultUnknown { + return + } + freq := time.Duration(monitor.Frequency) * time.Second + oldest := now.Add(-3 * freq) + if monitor.StateCheck.Before(oldest) { + monitor.State = m.EvalResultUnknown + monitor.StateChange = monitor.StateCheck + } +} + func (rows endpointRows) ToDTO() []m.EndpointDTO { endpointsById := make(map[int64]m.EndpointDTO) endpointChecksById := make(map[int64]map[int64]m.Check) @@ -67,6 +81,7 @@ func (rows endpointRows) ToDTO() []m.EndpointDTO { i := 0 for _, e := range endpointsById { for _, c := range endpointChecksById[e.Id] { + scrutinizeState(time.Now(), &c) e.Checks = append(e.Checks, c) } @@ -747,3 +762,70 @@ func getProbeChecksWithEndpointSlug(sess *session, probe *m.ProbeDTO) ([]CheckWi err = sess.Find(&checks) return checks, err } + +func UpdateCheckState(cState *m.CheckState) (int64, error) { + sess, err := newSession(true, "check") + if err != nil { + return 0, err + } + defer sess.Cleanup() + var affected int64 + if affected, err = updateCheckState(sess, cState); err != nil { + return 0, err + } + sess.Complete() + return affected, nil +} + +func updateCheckState(sess *session, cState *m.CheckState) (int64, error) { + sess.Table("check") + rawSql := "UPDATE check SET state=?, state_change=? WHERE id=? AND state != ? AND state_change < ?" + + res, err := sess.Exec(rawSql, cState.State, cState.Updated, cState.Id, cState.State, cState.Updated) + if err != nil { + return 0, err + } + + aff, _ := res.RowsAffected() + + rawSql = "UPDATE monitor SET state_check=? WHERE id=?" + res, err = sess.Exec(rawSql, cState.Checked, cState.Id) + if err != nil { + return aff, err + } + + return aff, nil +} + +func GetChecksForAlerts(ts int64) ([]m.CheckForAlertDTO, error) { + sess, err := newSession(false, "check") + if err != nil { + return nil, err + } + return getChecksForAlerts(sess, ts) +} + +func getChecksForAlerts(sess *session, ts int64) ([]m.CheckForAlertDTO, error) { + sess.Join("INNER", "endpoint", "check.endpoint_id=endpoint.id") + sess.Where("`check`.enabled=1 AND (? % `check`.frequency) = `check`.offset", ts) + sess.Cols( + "`check`.id", + "`check`.org_id", + "`check`.endpoint_id", + "endpoint.slug", + "endpoint.name", + "`check`.type", + "`check`.offset", + "`check`.frequency", + "`check`.enabled", + "`check`.state_change", + "`check`.state_check", + "`check`.settings", + "`check`.health_settings", + "`check`.created", + "`check`.updated", + ) + checks := make([]m.CheckForAlertDTO, 10) + err := sess.Find(&checks) + return checks, err +}