Skip to content

Commit

Permalink
Merge branch 'main' into feat/kafka-receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
magiskboy committed Jan 13, 2025
2 parents a11a558 + 3b61ae8 commit c99199b
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 40 deletions.
1 change: 1 addition & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ go:
# Whenever the Go version is updated here,
# .circle/config.yml should also be updated.
version: 1.23
cgo: true
repository:
path: github.com/prometheus/alertmanager
build:
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ type Receiver struct {
MSTeamsV2Configs []*MSTeamsV2Config `yaml:"msteamsv2_configs,omitempty" json:"msteamsv2_configs,omitempty"`
JiraConfigs []*JiraConfig `yaml:"jira_configs,omitempty" json:"jira_configs,omitempty"`
RocketchatConfigs []*RocketchatConfig `yaml:"rocketchat_configs,omitempty" json:"rocketchat_configs,omitempty"`
KafkaConfigs []*KafkaConfig `yaml:"kafka_configs,omitempty" json:"kafka_configs,omitempty"`
KafkaConfigs []*KafkaConfig `yaml:"kafka_configs,omitempty" json:"kafka_configs,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface for Receiver.
Expand Down
22 changes: 9 additions & 13 deletions config/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,9 @@ var (
NotifierConfig: NotifierConfig{
VSendResolved: true,
},

BootstrapServers: `{{ template "kafka.default.bootstrap_servers" . }}`,
Topic: `{{ template "kafka.default.topic" . }}`,
NumberOfPartition: 0,
BootstrapServers: `{{ template "kafka.default.bootstrap_servers" . }}`,
Topic: `{{ template "kafka.default.topic" . }}`,
NumberOfPartition: 0,
}
)

Expand Down Expand Up @@ -1008,20 +1007,17 @@ func (c *RocketchatConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
}

type KafkaConfig struct {
NotifierConfig `yaml:",inline" json:",inline"`
NotifierConfig `yaml:",inline" json:",inline"`

BootstrapServers string `yaml:"bootstrap_servers" json:"bootstrap_servers"`
Topic string `yaml:"topic" json:"topic"`
ExtrasConfigs *map[string]string `yaml:"extras_configs" json:"extras_configs"`
NumberOfPartition int `yaml:"number_of_partitions" json:"number_of_partitions"`
BootstrapServers string `yaml:"bootstrap_servers" json:"bootstrap_servers"`
Topic string `yaml:"topic" json:"topic"`
ExtrasConfigs *map[string]string `yaml:"extras_configs" json:"extras_configs"`
NumberOfPartition int32 `yaml:"number_of_partitions" json:"number_of_partitions"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KafkaConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultKafkaConfig
type plain KafkaConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}
2 changes: 1 addition & 1 deletion config/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/alertmanager/notify/discord"
"github.com/prometheus/alertmanager/notify/email"
"github.com/prometheus/alertmanager/notify/jira"
"github.com/prometheus/alertmanager/notify/kafka"
"github.com/prometheus/alertmanager/notify/msteams"
"github.com/prometheus/alertmanager/notify/msteamsv2"
"github.com/prometheus/alertmanager/notify/opsgenie"
Expand All @@ -37,7 +38,6 @@ import (
"github.com/prometheus/alertmanager/notify/webex"
"github.com/prometheus/alertmanager/notify/webhook"
"github.com/prometheus/alertmanager/notify/wechat"
"github.com/prometheus/alertmanager/notify/kafka"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
Expand Down
53 changes: 28 additions & 25 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@ import (
"context"
"encoding/json"
"log/slog"
"sync"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"

ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

var nextPartition = 0

// Notifier implements a Notifier for Discord notifications.
type Notifier struct {
conf *config.KafkaConfig
tmpl *template.Template
logger *slog.Logger
retrier *notify.Retrier
producer *ckafka.Producer
conf *config.KafkaConfig
logger *slog.Logger
producer *ckafka.Producer
partitionIndex int32
partitionIndexMutex sync.Mutex
}

type KafkaMessage struct {
Expand All @@ -44,39 +41,50 @@ type KafkaMessage struct {

// New returns a new Kafka notifier.
func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {

kafkaConfig := ckafka.ConfigMap{
"bootstrap.servers": c.BootstrapServers,
}

if c.ExtrasConfigs != nil {
for k, v := range *c.ExtrasConfigs {
kafkaConfig.SetKey(k, v)
if err := kafkaConfig.SetKey(k, v); err != nil {
return nil, err
}
}
}

p, err := ckafka.NewProducer(&kafkaConfig)

if err != nil {
return nil, err
}

slog.Log(context.Background(), slog.LevelInfo, "Connected to Kafka")

n := &Notifier{
conf: c,
logger: l,
producer: p,
conf: c,
logger: l,
producer: p,
partitionIndex: 0,
}

return n, nil
}

func (n *Notifier) NextPartition() {
n.partitionIndexMutex.Lock()
defer n.partitionIndexMutex.Unlock()
n.partitionIndex = (n.partitionIndex + 1) % n.conf.NumberOfPartition
}

func (n *Notifier) GetPartition() int32 {
return n.partitionIndex
}

// Notify implements the Notifier interface.
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
n.logger.Info("Sending alert to Kafka")
var buf bytes.Buffer
message := KafkaMessage{ Alerts: as }
message := KafkaMessage{Alerts: as}

if err := json.NewEncoder(&buf).Encode(message); err != nil {
slog.Log(ctx, slog.LevelError, "Failed to encode alert", "err", err)
Expand All @@ -89,23 +97,18 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
}

if unflushed := n.producer.Flush(1000); unflushed == 0 {
nextPartition++
if nextPartition == n.conf.NumberOfPartition {
nextPartition = 0
}

n.NextPartition()
n.logger.Info("Successfully produced alert")
return false, nil
}

return false, nil
}

func (n *Notifier) Produce(ctx context.Context, topic string, key string, value []byte) error {
func (n *Notifier) Produce(ctx context.Context, topic, key string, value []byte) error {
return n.producer.Produce(&ckafka.Message{
TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: int32(nextPartition)},
TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: n.GetPartition()},
Key: []byte(key),
Value: value,
}, nil);
}, nil)
}

0 comments on commit c99199b

Please sign in to comment.