diff --git a/config/receiver/receiver.go b/config/receiver/receiver.go index 79f21ab5ab..874bf9dd71 100644 --- a/config/receiver/receiver.go +++ b/config/receiver/receiver.go @@ -111,7 +111,7 @@ func BuildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logg add("rocketchat", i, c, func(l *slog.Logger) (notify.Notifier, error) { return rocketchat.New(c, tmpl, l, httpOpts...) }) } for i, c := range nc.KafkaConfigs { - add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l) }) + add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l, nil) }) } if errs.Len() > 0 { diff --git a/notify/kafka/kafka.go b/notify/kafka/kafka.go index 161a3b1e33..61399eea78 100644 --- a/notify/kafka/kafka.go +++ b/notify/kafka/kafka.go @@ -32,16 +32,26 @@ import ( // Notifier implements a Notifier for Discord notifications. type Notifier struct { - conf *config.KafkaConfig - logger *slog.Logger - writer *ckafka.Writer - numberOfPartition int - partitionIndex int - partitionIndexMutex sync.Mutex + conf *config.KafkaConfig + logger *slog.Logger + partitions int + partitionIdx int + partitionIdxMutex sync.Mutex + sendFunc func(ctx context.Context, msgs ...ckafka.Message) error } // New returns a new Kafka notifier. -func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) { +func New(c *config.KafkaConfig, l *slog.Logger, sendFunc *func(ctx context.Context, msgs ...ckafka.Message) error) (*Notifier, error) { + n := &Notifier{ + conf: c, + logger: l, + } + + if sendFunc != nil { + n.sendFunc = *sendFunc + return n, nil + } + transport := ckafka.Transport{} if c.SecurityProtocol != nil { @@ -70,16 +80,14 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) { writer.WriteTimeout = 45 * time.Second } - n := &Notifier{ - conf: c, - logger: l, - writer: writer, - } - if c.NumberOfPartition != nil { - n.numberOfPartition = *c.NumberOfPartition + n.partitions = *c.NumberOfPartition } else { - n.numberOfPartition = 1 + n.partitions = 1 + } + + n.sendFunc = func(ctx context.Context, msgs ...ckafka.Message) error { + return writer.WriteMessages(ctx, msgs...) } return n, nil @@ -87,20 +95,21 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) { // GetPartitionIndex returns the current partition index. func (n *Notifier) GetPartitionIndex() int { - return n.partitionIndex + return n.partitionIdx } // NextPartition returns the next partition index. func (n *Notifier) NextPartition() { - n.partitionIndexMutex.Lock() - n.partitionIndex = (n.partitionIndex + 1) % n.numberOfPartition - n.partitionIndexMutex.Unlock() + n.partitionIdxMutex.Lock() + n.partitionIdx = (n.partitionIdx + 1) % n.partitions + n.partitionIdxMutex.Unlock() } // Notify implements the Notifier interface. func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { - // Because retry is supported by kafka-go so it will be always false var buf bytes.Buffer + var msgs []ckafka.Message + // Because retry is supported by kafka-go so it will be always false shouldRetry := false for _, alert := range as { @@ -108,24 +117,23 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to marshal alert: %s", alert.Name()), "err", err) } - if err := n.Produce(ctx, alert.Name(), buf.Bytes()); err != nil { - slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to produce alert: %s", alert.Name()), "err", err) + message := ckafka.Message{ + Key: []byte(alert.Name()), + Value: buf.Bytes(), } - } - return shouldRetry, nil -} + if n.conf.NumberOfPartition != nil { + message.Partition = n.GetPartitionIndex() + n.NextPartition() + } -// Produce sends a message to Kafka. -func (n *Notifier) Produce(ctx context.Context, key string, value []byte) error { - message := ckafka.Message{ - Key: []byte(key), - Value: value, + msgs = append(msgs, message) } - if n.conf.NumberOfPartition != nil { - message.Partition = n.GetPartitionIndex() + if err := n.sendFunc(ctx, msgs...); err != nil { + slog.Log(ctx, slog.LevelError, "Failed to send message", "err", err) + return shouldRetry, err } - return n.writer.WriteMessages(ctx, message) + return shouldRetry, nil } diff --git a/notify/kafka/kafka_test.go b/notify/kafka/kafka_test.go new file mode 100644 index 0000000000..b217842d51 --- /dev/null +++ b/notify/kafka/kafka_test.go @@ -0,0 +1,43 @@ +// Copyright 2025 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "testing" + + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" + + "github.com/prometheus/alertmanager/config" + + ckafka "github.com/segmentio/kafka-go" +) + +func TestKafkaRetry(t *testing.T) { + sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error { + return nil + } + + notifier, err := New( + &config.KafkaConfig{ + Brokers: []string{"localhost:9092"}, + }, + promslog.NewNopLogger(), + &sendFunc, + ) + + require.NoError(t, err) + require.NotNil(t, notifier) +}