Skip to content

Commit bc7f754

Browse files
committed
Implement webhook publisher
- Add webhook configuration with URL, HTTP method, and send body options - Implement webhook publisher with deduplication using message fingerprints - Support for different HTTP methods and optional JSON body sending - Update processor to initialize webhook publisher when configured
1 parent 23a0f9d commit bc7f754

File tree

9 files changed

+693
-2
lines changed

9 files changed

+693
-2
lines changed

cmd/lambda.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
categoryProcessor = "PROCESSOR:"
2020
categorySlack = "PUBLISHER SLACK:"
2121
categoryPagerDuty = "PUBLISHER PAGERDUTY:"
22+
categoryWebhook = "PUBLISHER WEBHOOK:"
2223
)
2324

2425
var (
@@ -31,14 +32,17 @@ func CommandLambda(cfg *config.Config) *cli.Command {
3132
envPrefixProcessor := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(categoryProcessor, " ", "_"), ":", "")) + "_"
3233
envPrefixSlack := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(categorySlack, " ", "_"), ":", "")) + "_"
3334
envPrefixPagerDuty := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(categoryPagerDuty, " ", "_"), ":", "")) + "_"
35+
envPrefixWebhook := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(categoryWebhook, " ", "_"), ":", "")) + "_"
3436

3537
cliPrefixDynamoDB := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(categoryDynamoDB, " ", "-"), ":", "")) + "-"
3638
cliPrefixProcessor := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(categoryProcessor, " ", "-"), ":", "")) + "-"
3739
cliPrefixSlack := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(categorySlack, " ", "-"), ":", "")) + "-"
3840
cliPrefixPagerDuty := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(categoryPagerDuty, " ", "-"), ":", "")) + "-"
41+
cliPrefixWebhook := strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(categoryWebhook, " ", "-"), ":", "")) + "-"
3942

4043
envSlackToken := envPrefix + envPrefixSlack + "TOKEN"
4144
envPagerDutyIntegrationKey := envPrefix + envPrefixPagerDuty + "INTEGRATION_KEY"
45+
envWebhookURL := envPrefix + envPrefixWebhook + "URL"
4246

4347
rawProcessorIgnoreRules := &cli.StringSlice{}
4448
rawProcessorMatchLabels := &cli.StringSlice{}
@@ -100,11 +104,40 @@ func CommandLambda(cfg *config.Config) *cli.Command {
100104
},
101105
}
102106

107+
flagsWebhook := []cli.Flag{
108+
&cli.StringFlag{
109+
Category: categoryWebhook,
110+
Destination: &cfg.Webhook.URL,
111+
EnvVars: []string{envWebhookURL},
112+
Name: cliPrefixWebhook + "url",
113+
Usage: "webhook `URL` to send alerts to (either raw URL, or ARN of secret manager)",
114+
},
115+
116+
&cli.StringFlag{
117+
Category: categoryWebhook,
118+
Destination: &cfg.Webhook.Method,
119+
EnvVars: []string{envPrefix + envPrefixWebhook + "METHOD"},
120+
Name: cliPrefixWebhook + "method",
121+
Usage: "HTTP `method` to use for webhook requests",
122+
Value: "POST",
123+
},
124+
125+
&cli.BoolFlag{
126+
Category: categoryWebhook,
127+
Destination: &cfg.Webhook.SendBody,
128+
EnvVars: []string{envPrefix + envPrefixWebhook + "SEND_BODY"},
129+
Name: cliPrefixWebhook + "send-body",
130+
Usage: "whether to send alert data as JSON body in webhook requests",
131+
Value: true,
132+
},
133+
}
134+
103135
flags := slices.Concat(
104136
flagsDB,
105137
flagsProcessor,
106138
flagsSlack,
107139
flagsPagerDuty,
140+
flagsWebhook,
108141
)
109142

110143
return &cli.Command{
@@ -132,6 +165,12 @@ func CommandLambda(cfg *config.Config) *cli.Command {
132165
return err
133166
}
134167

168+
cfg.Webhook.URL, err = stringOrLoadFromSecretsmanager(
169+
cfg.Webhook.URL, envWebhookURL)
170+
if err != nil {
171+
return err
172+
}
173+
135174
{ // parse the list of ignored rules
136175
processorIgnoreRules := rawProcessorIgnoreRules.Value()
137176
if len(processorIgnoreRules) > 0 {

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type Config struct {
77

88
PagerDuty *PagerDuty `yaml:"pagerduty"`
99
Slack *Slack `yaml:"slack"`
10+
Webhook *Webhook `yaml:"webhook"`
1011
}
1112

1213
func New() *Config {
@@ -17,5 +18,6 @@ func New() *Config {
1718

1819
PagerDuty: &PagerDuty{},
1920
Slack: &Slack{Channel: &SlackChannel{}},
21+
Webhook: &Webhook{},
2022
}
2123
}

config/webhook.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package config
2+
3+
type Webhook struct {
4+
URL string `yaml:"url"`
5+
Method string `yaml:"method"`
6+
SendBody bool `yaml:"send_body"`
7+
}
8+
9+
func (w *Webhook) Enabled() bool {
10+
return w.URL != ""
11+
}

mock/publisher/webhook.go

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package processor
22

33
import (
44
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
57
"errors"
68
"time"
79

@@ -51,6 +53,15 @@ func New(cfg *config.Config) (*Processor, error) {
5153
publishers = append(publishers, publisher.NewPagerDuty(cfg.PagerDuty))
5254
}
5355

56+
if cfg.Webhook.Enabled() {
57+
urlHash := sha256.Sum256([]byte(cfg.Webhook.URL))
58+
59+
publishers = append(publishers, publisher.NewWebhook(
60+
cfg.Webhook,
61+
db.WithNamespace("webhook-"+hex.EncodeToString(urlHash[:])),
62+
))
63+
}
64+
5465
if len(publishers) == 0 {
5566
return nil, ErrPublisherUndefined
5667
}

publisher/publisher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Publisher interface {
1212
}
1313

1414
const (
15-
timeoutLock = time.Second
16-
timeoutThreadExpiry = 30 * 24 * time.Hour
15+
timeoutLock = time.Second
16+
timeoutThreadExpiry = 30 * 24 * time.Hour
17+
timeoutWebhookExpiry = 30 * 24 * time.Hour
1718
)

publisher/webhook.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package publisher
2+
3+
//go:generate go tool mockgen -package mock_publisher -destination ../mock/publisher/webhook.go -source webhook.go -mock_names httpClient=Mock_httpClient httpClient
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
"net/http"
12+
13+
"github.com/flashbots/amp-alerts-sink/config"
14+
"github.com/flashbots/amp-alerts-sink/db"
15+
"github.com/flashbots/amp-alerts-sink/logutils"
16+
"github.com/flashbots/amp-alerts-sink/types"
17+
"go.uber.org/zap"
18+
)
19+
20+
type webhook struct {
21+
url string
22+
method string
23+
sendBody bool
24+
25+
client httpClient
26+
db db.DB
27+
}
28+
29+
type httpClient interface {
30+
Do(req *http.Request) (*http.Response, error)
31+
}
32+
33+
func NewWebhook(cfg *config.Webhook, db db.DB) Publisher {
34+
method := cfg.Method
35+
if method == "" {
36+
method = "POST"
37+
}
38+
39+
return &webhook{
40+
url: cfg.URL,
41+
method: method,
42+
sendBody: cfg.SendBody,
43+
44+
client: http.DefaultClient,
45+
db: db,
46+
}
47+
}
48+
49+
func (w *webhook) Publish(
50+
ctx context.Context,
51+
source string,
52+
alert *types.AlertmanagerAlert,
53+
) error {
54+
l := logutils.LoggerFromContext(ctx)
55+
l.Info("Publishing alert", zap.Any("alert", alert))
56+
57+
isDup, err := w.checkDup(ctx, alert)
58+
if isDup {
59+
// Enter this branch even with non-nil err;
60+
// Only for ErrAlreadyLocked, so that lambda execution will be restarted
61+
l.Info("Duplicate alert detected", zap.Error(err))
62+
return err
63+
}
64+
// not a duplicate
65+
if err != nil {
66+
l.Error("Failed to check for duplicate alert, sending webhook", zap.Error(err))
67+
}
68+
69+
err = w.sendWebhook(ctx, source, alert)
70+
if err != nil {
71+
l.Error("Failed to send alert", zap.Error(err))
72+
} else {
73+
// sent correctly, prevent other instances from sending
74+
_ = w.db.Set(ctx, alert.MessageDedupKey(), timeoutWebhookExpiry, "1")
75+
}
76+
return err
77+
}
78+
79+
func (w *webhook) checkDup(ctx context.Context, alert *types.AlertmanagerAlert) (isDup bool, err error) {
80+
v, err := w.db.Get(ctx, alert.MessageDedupKey())
81+
if err != nil {
82+
return false, fmt.Errorf("failed to check for duplicate alert: %w", err)
83+
}
84+
if v != "" {
85+
return true, nil
86+
}
87+
88+
didLock, err := w.db.Lock(ctx, alert.MessageDedupKey(), timeoutLock)
89+
if err != nil {
90+
return false, fmt.Errorf("failed to lock alert: %w", err)
91+
}
92+
if !didLock {
93+
// another instance is about to publish
94+
return true, ErrAlreadyLocked
95+
}
96+
97+
return false, nil
98+
}
99+
100+
func (w *webhook) sendWebhook(
101+
ctx context.Context,
102+
source string,
103+
alert *types.AlertmanagerAlert,
104+
) error {
105+
l := logutils.LoggerFromContext(ctx)
106+
107+
var reqBody io.Reader
108+
var contentType string
109+
110+
if w.sendBody {
111+
buf, err := w.encodeAlert(source, alert)
112+
if err != nil {
113+
l.Error("Failed to encode alert", zap.Error(err))
114+
return err
115+
}
116+
l.Debug("Webhook payload", zap.ByteString("body", buf.Bytes()))
117+
118+
reqBody = buf
119+
contentType = "application/json"
120+
}
121+
122+
req, err := http.NewRequestWithContext(ctx, w.method, w.url, reqBody)
123+
if err != nil {
124+
l.Error("Failed to create webhook request", zap.Error(err))
125+
return fmt.Errorf("failed to create webhook request: %w", err)
126+
}
127+
128+
if contentType != "" {
129+
req.Header.Set("Content-Type", contentType)
130+
}
131+
132+
l.Info("Sending webhook request",
133+
zap.String("url", w.url),
134+
zap.String("method", w.method),
135+
zap.Bool("send_body", w.sendBody),
136+
zap.String("alert_fingerprint", alert.MessageDedupKey()),
137+
)
138+
139+
resp, err := w.client.Do(req)
140+
if err != nil {
141+
l.Error("Webhook request failed", zap.Error(err))
142+
return fmt.Errorf("webhook request failed: %w", err)
143+
}
144+
defer resp.Body.Close()
145+
146+
respBody, readErr := io.ReadAll(resp.Body)
147+
if readErr != nil {
148+
l.Warn("Failed to read webhook response body", zap.Error(readErr))
149+
}
150+
151+
if resp.StatusCode != http.StatusOK {
152+
l.Error("Webhook returned non-200 status",
153+
zap.Int("status_code", resp.StatusCode),
154+
zap.String("response_body", string(respBody)),
155+
)
156+
return fmt.Errorf("webhook returned status %d: %s", resp.StatusCode, resp.Status)
157+
}
158+
159+
l.Info("Successfully published alert to webhook",
160+
zap.String("response_body", string(respBody)),
161+
)
162+
return nil
163+
}
164+
165+
func (w *webhook) encodeAlert(source string, alert *types.AlertmanagerAlert) (*bytes.Buffer, error) {
166+
body := types.AlertmanagerWebhook{
167+
Version: "4",
168+
GroupKey: alert.MessageDedupKey(),
169+
170+
AlertmanagerMessage: types.AlertmanagerMessage{
171+
Receiver: source,
172+
Status: alert.Status,
173+
Alerts: []types.AlertmanagerAlert{*alert},
174+
},
175+
}
176+
177+
buf := &bytes.Buffer{}
178+
if err := json.NewEncoder(buf).Encode(body); err != nil {
179+
return nil, fmt.Errorf("failed to marshal webhook payload: %w", err)
180+
}
181+
182+
return buf, nil
183+
}

0 commit comments

Comments
 (0)