Skip to content

Commit

Permalink
feat(kafka-receiver): add test
Browse files Browse the repository at this point in the history
  • Loading branch information
magiskboy committed Jan 15, 2025
1 parent 045334f commit ee6c730
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 34 deletions.
2 changes: 1 addition & 1 deletion config/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BuildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logg
add("rocketchat", i, c, func(l *slog.Logger) (notify.Notifier, error) { return rocketchat.New(c, tmpl, l, httpOpts...) })
}
for i, c := range nc.KafkaConfigs {
add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l) })
add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l, nil) })
}

if errs.Len() > 0 {
Expand Down
74 changes: 41 additions & 33 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,26 @@ import (

// Notifier implements a Notifier for Discord notifications.
type Notifier struct {
conf *config.KafkaConfig
logger *slog.Logger
writer *ckafka.Writer
numberOfPartition int
partitionIndex int
partitionIndexMutex sync.Mutex
conf *config.KafkaConfig
logger *slog.Logger
partitions int
partitionIdx int
partitionIdxMutex sync.Mutex
sendFunc func(ctx context.Context, msgs ...ckafka.Message) error
}

// New returns a new Kafka notifier.
func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
func New(c *config.KafkaConfig, l *slog.Logger, sendFunc *func(ctx context.Context, msgs ...ckafka.Message) error) (*Notifier, error) {
n := &Notifier{
conf: c,
logger: l,
}

if sendFunc != nil {
n.sendFunc = *sendFunc
return n, nil
}

transport := ckafka.Transport{}

if c.SecurityProtocol != nil {
Expand Down Expand Up @@ -70,62 +80,60 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
writer.WriteTimeout = 45 * time.Second
}

n := &Notifier{
conf: c,
logger: l,
writer: writer,
}

if c.NumberOfPartition != nil {
n.numberOfPartition = *c.NumberOfPartition
n.partitions = *c.NumberOfPartition
} else {
n.numberOfPartition = 1
n.partitions = 1
}

n.sendFunc = func(ctx context.Context, msgs ...ckafka.Message) error {
return writer.WriteMessages(ctx, msgs...)
}

return n, nil
}

// GetPartitionIndex returns the current partition index.
func (n *Notifier) GetPartitionIndex() int {
return n.partitionIndex
return n.partitionIdx
}

// NextPartition returns the next partition index.
func (n *Notifier) NextPartition() {
n.partitionIndexMutex.Lock()
n.partitionIndex = (n.partitionIndex + 1) % n.numberOfPartition
n.partitionIndexMutex.Unlock()
n.partitionIdxMutex.Lock()
n.partitionIdx = (n.partitionIdx + 1) % n.partitions
n.partitionIdxMutex.Unlock()
}

// Notify implements the Notifier interface.
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
// Because retry is supported by kafka-go so it will be always false
var buf bytes.Buffer
var msgs []ckafka.Message
// Because retry is supported by kafka-go so it will be always false
shouldRetry := false

for _, alert := range as {
if err := json.NewEncoder(&buf).Encode(alert); err != nil {
slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to marshal alert: %s", alert.Name()), "err", err)
}

if err := n.Produce(ctx, alert.Name(), buf.Bytes()); err != nil {
slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to produce alert: %s", alert.Name()), "err", err)
message := ckafka.Message{
Key: []byte(alert.Name()),
Value: buf.Bytes(),
}
}

return shouldRetry, nil
}
if n.conf.NumberOfPartition != nil {
message.Partition = n.GetPartitionIndex()
n.NextPartition()
}

// Produce sends a message to Kafka.
func (n *Notifier) Produce(ctx context.Context, key string, value []byte) error {
message := ckafka.Message{
Key: []byte(key),
Value: value,
msgs = append(msgs, message)
}

if n.conf.NumberOfPartition != nil {
message.Partition = n.GetPartitionIndex()
if err := n.sendFunc(ctx, msgs...); err != nil {
slog.Log(ctx, slog.LevelError, "Failed to send message", "err", err)
return shouldRetry, err
}

return n.writer.WriteMessages(ctx, message)
return shouldRetry, nil
}
30 changes: 30 additions & 0 deletions notify/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kafka

import (
"context"
"testing"

"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/config"

ckafka "github.com/segmentio/kafka-go"
)

func TestKafkaRetry(t *testing.T) {
sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error {
return nil
}

notifier, err := New(
&config.KafkaConfig{
Brokers: []string{"localhost:9092"},
},
promslog.NewNopLogger(),
&sendFunc,
)

require.NoError(t, err)
require.NotNil(t, notifier)
}

0 comments on commit ee6c730

Please sign in to comment.