Skip to content

Commit

Permalink
alert event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort committed Aug 4, 2023
1 parent 2b885c0 commit 52bbdba
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 48 deletions.
101 changes: 99 additions & 2 deletions alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package noaalert

import (
"os"
"os/signal"

sdk "github.com/rotationalio/go-ensign"
api "github.com/rotationalio/go-ensign/api/v1beta1"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -42,6 +44,101 @@ func NewAlerts(conf Config) (sub *Subscriber, err error) {
return sub, nil
}

func (s *Subscriber) Listen() (*sdk.Subscription, error) {
return s.ensign.Subscribe(s.conf.Topic)
func (s *Subscriber) Run(cb func(*AlertEvent) error) (err error) {
// Catch OS signals for graceful shutdowns
var alerts <-chan *AlertEvent
done := make(chan struct{})
if alerts, err = s.Listen(done); err != nil {
return err
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
go func() {
<-quit
done <- struct{}{}
}()

for alert := range alerts {
if err = cb(alert); err != nil {
done <- struct{}{}
return err
}
}
return nil
}

func (s *Subscriber) Listen(done <-chan struct{}) (_ <-chan *AlertEvent, err error) {
var sub *sdk.Subscription
if sub, err = s.ensign.Subscribe(s.conf.Topic); err != nil {
return nil, err
}

alerts := make(chan *AlertEvent, 100)
go func(alerts chan<- *AlertEvent, sub *sdk.Subscription) {
log.Info().Str("topic", s.conf.Topic).Msg("listening for alerts")
defer sub.Close()

var (
events uint64
skipped uint64
)

eventLoop:
for {
select {
case event := <-sub.C:
log.Debug().Str("id", event.ID()).Str("topic_id", event.TopicID()).Str("type", event.Type.String()).Msg("event recv")

if event.Type.Name != "Alert" {
log.Debug().Str("type", event.Type.String()).Msg("unknown type")
if _, err := event.Nack(api.Nack_UNKNOWN_TYPE); err != nil {
log.Warn().Err(err).Str("id", event.ID()).Str("reason", "unknown_type").Msg("could not nack event")
}
skipped++
continue eventLoop
}

if event.Mimetype != Mimetype {
log.Debug().Str("mimetype", event.Mimetype.MimeType()).Msg("unknown mimetype")
if _, err := event.Nack(api.Nack_UNHANDLED_MIMETYPE); err != nil {
log.Warn().Err(err).Str("id", event.ID()).Str("reason", "unknown_mimetype").Msg("could not nack event")
}
skipped++
continue eventLoop
}

alert := &AlertEvent{
CorrelationID: event.Metadata["correlation_id"],
RequestID: event.Metadata["request_id"],
ServerID: event.Metadata["server_id"],
LastModified: event.Metadata["last_modified"],
Expires: event.Metadata["expires"],
Data: event.Data,
}

if err := alert.parse(); err != nil {
log.Debug().Err(err).Msg("could not parse alert")
if _, err := event.Nack(api.Nack_UNPROCESSED); err != nil {
log.Warn().Err(err).Str("id", event.ID()).Str("reason", "unprocessed").Msg("could not nack event")
}
skipped++
continue eventLoop
}

if _, err := event.Ack(); err != nil {
log.Warn().Err(err).Str("id", event.ID()).Msg("could not ack event")
}

alerts <- alert
events++
case <-done:
log.Info().Uint64("events", events).Uint64("skipped", events).Msg("closing subscription channel")
return
}
}

}(alerts, sub)

return alerts, nil
}
38 changes: 19 additions & 19 deletions cmd/noaalert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/bbengfort/noaalert"
"github.com/joho/godotenv"
"github.com/rotationalio/go-ensign"
"github.com/rs/zerolog/log"
cli "github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -72,16 +71,19 @@ func subscribe(c *cli.Context) (err error) {
return cli.Exit(err, 1)
}

var events *ensign.Subscription
if events, err = sub.Listen(); err != nil {
return cli.Exit(err, 1)
}
defer events.Close()
err = sub.Run(func(alert *noaalert.AlertEvent) (err error) {
var headline string
if headline, err = alert.Headline(); err != nil {
log.Warn().Err(err).Msg("could not get headline from alert")
return nil
}

for event := range events.C {
// TODO: do a better job of printing events out
fmt.Println(event)
event.Ack()
log.Info().Msg(headline)
return nil
})

if err != nil {
return cli.Exit(err, 1)
}
return nil
}
Expand All @@ -95,19 +97,17 @@ func alerts(c *cli.Context) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var features []interface{}
if features, err = api.Alerts(ctx); err != nil {
var events []*noaalert.AlertEvent
if events, err = api.Alerts(ctx); err != nil {
return cli.Exit(err, 1)
}

for _, feature := range features {
fmap := feature.(map[string]interface{})
props := fmap["properties"].(map[string]interface{})

headline, ok := props["headline"].(string)
if ok {
fmt.Println(headline)
for _, event := range events {
var headline string
if headline, err = event.Headline(); err != nil {
continue
}
fmt.Println(headline)
}
return nil
}
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package noaalert

import "errors"

var (
ErrNoProperties = errors.New("parsed alert contains no properties")
ErrNoHeadline = errors.New("parsed alert conains no headline")
)
68 changes: 68 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package noaalert

import (
"encoding/json"

"github.com/rotationalio/go-ensign"
api "github.com/rotationalio/go-ensign/api/v1beta1"
mimetype "github.com/rotationalio/go-ensign/mimetype/v1beta1"
)

type AlertEvent struct {
CorrelationID string
RequestID string
ServerID string
LastModified string
Expires string
Data []byte
parsed map[string]interface{}
}

var Mimetype = mimetype.ApplicationJSON

var AlertType = &api.Type{
Name: "Alert",
MajorVersion: 1,
MinorVersion: 0,
PatchVersion: 0,
}

func (a *AlertEvent) Event() *ensign.Event {
meta := make(ensign.Metadata)
meta["correlation_id"] = a.CorrelationID
meta["request_id"] = a.RequestID
meta["server_id"] = a.ServerID
meta["last_modified"] = a.LastModified
meta["expires"] = a.Expires

return &ensign.Event{
Metadata: meta,
Data: a.Data,
Type: AlertType,
Mimetype: Mimetype,
}
}

func (a *AlertEvent) Headline() (_ string, err error) {
if err = a.parse(); err != nil {
return "", err
}

props, ok := a.parsed["properties"].(map[string]interface{})
if !ok {
return "", ErrNoProperties
}

headline, ok := props["headline"].(string)
if !ok {
return "", ErrNoHeadline
}
return headline, nil
}

func (a *AlertEvent) parse() error {
if a.parsed == nil {
return json.Unmarshal(a.Data, &a.parsed)
}
return nil
}
30 changes: 28 additions & 2 deletions noaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewWeatherAPI() (api *Weather, err error) {
return api, nil
}

func (s *Weather) Alerts(ctx context.Context) (_ []interface{}, err error) {
func (s *Weather) Alerts(ctx context.Context) (_ []*AlertEvent, err error) {
var req *http.Request
if req, err = s.NewRequest(ctx, http.MethodGet, "/alerts/active", nil, nil); err != nil {
return nil, err
Expand All @@ -71,7 +71,33 @@ func (s *Weather) Alerts(ctx context.Context) (_ []interface{}, err error) {
}

if features, ok := alerts["features"]; ok {
return features.([]interface{}), nil
if featureList, ok := features.([]interface{}); ok {
// Get the NOAA request headers to create events
correlationID := rep.Header.Get("X-Correlation-Id")
requestID := rep.Header.Get("X-Request-Id")
serverID := rep.Header.Get("X-Server-Id")
lastModified := rep.Header.Get("Last-Modified")
expires := rep.Header.Get("Expires")

events := make([]*AlertEvent, 0, len(featureList))
for _, feature := range featureList {
event := &AlertEvent{
CorrelationID: correlationID,
RequestID: requestID,
ServerID: serverID,
LastModified: lastModified,
Expires: expires,
}

if event.Data, err = json.Marshal(feature); err != nil {
return nil, err
}

events = append(events, event)
}

return events, nil
}
}
return nil, fmt.Errorf("no alerts returned")
}
Expand Down
47 changes: 23 additions & 24 deletions noaalert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package noaalert

import (
"context"
"encoding/json"
"os"
"os/signal"
"time"

sdk "github.com/rotationalio/go-ensign"
mimetype "github.com/rotationalio/go-ensign/mimetype/v1beta1"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -75,6 +73,7 @@ func (p *Publisher) Run() error {

p.started = time.Now()
ticker := time.NewTicker(p.conf.Interval)
log.Info().Dur("interval", p.conf.Interval).Str("topic", p.conf.Topic).Msg("starting alerts publisher")

// Begin API query loop
queryLoop:
Expand All @@ -83,18 +82,19 @@ queryLoop:
case err := <-p.echan:
return err
case <-ticker.C:
log.Debug().Msg("starting collection of noaa alerts")

count := 0
for _, alert := range p.Alerts() {
event := &sdk.Event{Data: alert, Mimetype: mimetype.ApplicationJSON}
if err := p.ensign.Publish(p.conf.Topic, event); err != nil {
for alert := range p.Alerts() {
if err := p.ensign.Publish(p.conf.Topic, alert); err != nil {
log.Error().Err(err).Int("count", count).Msg("could not publish weather alert")
continue queryLoop
}

// TODO: check acks/nacks
count++
}
log.Info().Int("count", count).Msg("weather alerts published")
log.Info().Str("topic", p.conf.Topic).Int("count", count).Msg("weather alerts published")
}
}
}
Expand All @@ -108,27 +108,26 @@ func (p *Publisher) Shutdown() (err error) {
return nil
}

func (p *Publisher) Alerts() [][]byte {
// TODO: set default timeout in configuration
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
func (p *Publisher) Alerts() <-chan *sdk.Event {
events := make(chan *sdk.Event)
go func(events chan<- *sdk.Event) {
defer close(events)

alerts, err := p.api.Alerts(ctx)
if err != nil {
log.Warn().Err(err).Msg("could not fetch noaa alerts")
return nil
}
// TODO: set default timeout in configuration
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

data := make([][]byte, 0, len(alerts))
for _, alert := range alerts {
// TODO: don't republish alerts
alertjson, err := json.Marshal(alert)
alerts, err := p.api.Alerts(ctx)
if err != nil {
log.Warn().Err(err).Msg("could not parse alert json")
continue
log.Warn().Err(err).Msg("could not fetch noaa alerts")
return
}
data = append(data, alertjson)
}

return data
log.Debug().Int("nalerts", len(alerts)).Msg("received alerts from NOAA")
for _, alert := range alerts {
// TODO: don't republish alerts
events <- alert.Event()
}
}(events)
return events
}
Loading

0 comments on commit 52bbdba

Please sign in to comment.