Skip to content

Commit

Permalink
feat(kafka-receiver): add test
Browse files Browse the repository at this point in the history
Signed-off-by: Nguyen Khac Thanh <[email protected]>
  • Loading branch information
magiskboy committed Jan 15, 2025
1 parent 045334f commit 8cc6b2b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 38 deletions.
7 changes: 3 additions & 4 deletions config/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,8 @@ var (
VSendResolved: true,
},

Brokers: []string{},
Topic: `{{ template "kafka.default.topic" . }}`,
NumberOfPartition: nil,
Brokers: []string{},
Topic: `{{ template "kafka.default.topic" . }}`,
}
)

Expand Down Expand Up @@ -1012,7 +1011,7 @@ type KafkaConfig struct {

Brokers []string `yaml:"brokers" json:"brokers"`
Topic string `yaml:"topic" json:"topic"`
NumberOfPartition *int `yaml:"number_of_partitions" json:"number_of_partitions"`
NumberOfPartition int `yaml:"number_of_partitions" json:"number_of_partitions"`
SecurityProtocol *string `yaml:"security_protocol" json:"security_protocol"`
Username *string `yaml:"username" json:"username"`
Password *string `yaml:"password" json:"password"`
Expand Down
2 changes: 1 addition & 1 deletion config/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BuildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logg
add("rocketchat", i, c, func(l *slog.Logger) (notify.Notifier, error) { return rocketchat.New(c, tmpl, l, httpOpts...) })
}
for i, c := range nc.KafkaConfigs {
add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l) })
add("kafka", i, c, func(l *slog.Logger) (notify.Notifier, error) { return kafka.New(c, l, nil) })
}

if errs.Len() > 0 {
Expand Down
73 changes: 40 additions & 33 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,25 @@ import (

// Notifier implements a Notifier for Discord notifications.
type Notifier struct {
conf *config.KafkaConfig
logger *slog.Logger
writer *ckafka.Writer
numberOfPartition int
partitionIndex int
partitionIndexMutex sync.Mutex
conf *config.KafkaConfig
logger *slog.Logger
partition int
partitionMutex sync.Mutex
sendFunc func(ctx context.Context, msgs ...ckafka.Message) error
}

// New returns a new Kafka notifier.
func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
func New(c *config.KafkaConfig, l *slog.Logger, sendFunc *func(ctx context.Context, msgs ...ckafka.Message) error) (*Notifier, error) {
n := &Notifier{
conf: c,
logger: l,
}

if sendFunc != nil {
n.sendFunc = *sendFunc
return n, nil
}

transport := ckafka.Transport{}

if c.SecurityProtocol != nil {
Expand Down Expand Up @@ -70,62 +79,60 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
writer.WriteTimeout = 45 * time.Second
}

n := &Notifier{
conf: c,
logger: l,
writer: writer,
if c.NumberOfPartition > 0 {
n.partition = c.NumberOfPartition
} else {
n.partition = 1
}

if c.NumberOfPartition != nil {
n.numberOfPartition = *c.NumberOfPartition
} else {
n.numberOfPartition = 1
n.sendFunc = func(ctx context.Context, msgs ...ckafka.Message) error {
return writer.WriteMessages(ctx, msgs...)
}

return n, nil
}

// GetPartitionIndex returns the current partition index.
func (n *Notifier) GetPartitionIndex() int {
return n.partitionIndex
return n.partition
}

// NextPartition returns the next partition index.
func (n *Notifier) NextPartition() {
n.partitionIndexMutex.Lock()
n.partitionIndex = (n.partitionIndex + 1) % n.numberOfPartition
n.partitionIndexMutex.Unlock()
n.partitionMutex.Lock()
n.partition = (n.partition + 1) % n.conf.NumberOfPartition
n.partitionMutex.Unlock()
}

// Notify implements the Notifier interface.
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
// Because retry is supported by kafka-go so it will be always false
var buf bytes.Buffer
var msgs []ckafka.Message
// Because retry is supported by kafka-go so it will be always false
shouldRetry := false

for _, alert := range as {
if err := json.NewEncoder(&buf).Encode(alert); err != nil {
slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to marshal alert: %s", alert.Name()), "err", err)
}

if err := n.Produce(ctx, alert.Name(), buf.Bytes()); err != nil {
slog.Log(ctx, slog.LevelError, fmt.Sprintf("Failed to produce alert: %s", alert.Name()), "err", err)
message := ckafka.Message{
Key: []byte(alert.Name()),
Value: buf.Bytes(),
}
}

return shouldRetry, nil
}
if n.conf.NumberOfPartition > 0 {
message.Partition = n.GetPartitionIndex()
n.NextPartition()
}

// Produce sends a message to Kafka.
func (n *Notifier) Produce(ctx context.Context, key string, value []byte) error {
message := ckafka.Message{
Key: []byte(key),
Value: value,
msgs = append(msgs, message)
}

if n.conf.NumberOfPartition != nil {
message.Partition = n.GetPartitionIndex()
if err := n.sendFunc(ctx, msgs...); err != nil {
slog.Log(ctx, slog.LevelError, "Failed to send message", "err", err)
return shouldRetry, err
}

return n.writer.WriteMessages(ctx, message)
return shouldRetry, nil
}
95 changes: 95 additions & 0 deletions notify/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2025 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"context"
"sync"
"testing"

"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/types"

ckafka "github.com/segmentio/kafka-go"
)

func TestKafkaNotify(t *testing.T) {
var counter int
sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error {
counter++
return nil
}

notifier, err := New(
&config.KafkaConfig{
Brokers: []string{"localhost:9092"},
},
promslog.NewNopLogger(),
&sendFunc,
)

require.NoError(t, err)
require.NotNil(t, notifier)

notifier.Notify(context.Background(), &types.Alert{})

require.Equal(t, 1, counter)
}

func TestKafkaNotifyRoundRobin(t *testing.T) {
var counter int
partitions := 2
sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error {
require.Equal(t, counter%partitions+1, msgs[0].Partition)
counter++
return nil
}

notifier, err := New(
&config.KafkaConfig{
Brokers: []string{"localhost:9092"},
NumberOfPartition: partitions,
},
promslog.NewNopLogger(),
&sendFunc,
)

require.NoError(t, err)
require.NotNil(t, notifier)

var wg sync.WaitGroup

notifier.Notify(context.Background(), &types.Alert{})
notifier.Notify(context.Background(), &types.Alert{})
notifier.Notify(context.Background(), &types.Alert{})

wg.Add(1)
go func() {
defer wg.Done()
notifier.Notify(context.Background(), &types.Alert{})
}()

wg.Add(1)
go func() {
defer wg.Done()
notifier.Notify(context.Background(), &types.Alert{})
}()

notifier.Notify(context.Background(), &types.Alert{})

wg.Wait()
}
1 change: 1 addition & 0 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func (m *Metrics) InitializeFor(receiver map[string][]Integration) {
"msteamsv2",
"jira",
"rocketchat",
"kafka",
} {
m.numNotifications.WithLabelValues(integration)
m.numNotificationRequestsTotal.WithLabelValues(integration)
Expand Down

0 comments on commit 8cc6b2b

Please sign in to comment.