Skip to content

Commit

Permalink
chore(kafka-receiver): support config ssl
Browse files Browse the repository at this point in the history
  • Loading branch information
magiskboy committed Jan 14, 2025
1 parent 9068fd2 commit d7e8607
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
15 changes: 8 additions & 7 deletions config/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ var (

Brokers: []string{},
Topic: `{{ template "kafka.default.topic" . }}`,
NumberOfPartition: 0,
NumberOfPartition: nil,
}
)

Expand Down Expand Up @@ -1010,12 +1010,13 @@ func (c *RocketchatConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
type KafkaConfig struct {
NotifierConfig `yaml:",inline" json:",inline"`

Brokers []string `yaml:"brokers" json:"brokers"`
Topic string `yaml:"topic" json:"topic"`
NumberOfPartition int `yaml:"number_of_partitions" json:"number_of_partitions"`
UseSASL bool `yaml:"use_sasl" json:"use_sasl"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
Brokers []string `yaml:"brokers" json:"brokers"`
Topic string `yaml:"topic" json:"topic"`
NumberOfPartition *int `yaml:"number_of_partitions" json:"number_of_partitions"`
SecurityProtocol *string `yaml:"security_protocol" json:"security_protocol"`
Username *string `yaml:"username" json:"username"`
Password *string `yaml:"password" json:"password"`
Timeout *time.Duration `yaml:"timeout" json:"timeout"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down
36 changes: 26 additions & 10 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Notifier struct {
conf *config.KafkaConfig
logger *slog.Logger
writer *ckafka.Writer
numberOfPartition int
partitionIndex int
partitionIndexMutex sync.Mutex
}
Expand All @@ -45,15 +46,18 @@ type KafkaMessage struct {

// New returns a new Kafka notifier.
func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
mechanism := plain.Mechanism{
Username: c.Username,
Password: c.Password,
}

transport := ckafka.Transport{
SASL: mechanism,
DialTimeout: 45 * time.Second,
TLS: &tls.Config{},
transport := ckafka.Transport{}

if c.SecurityProtocol != nil {
transport.TLS = &tls.Config{}

if *c.SecurityProtocol == "SASL_SSL" {
// default is PLAIN mechanism
transport.SASL = plain.Mechanism{
Username: *c.Username,
Password: *c.Password,
}
}
}

writer := &ckafka.Writer{
Expand All @@ -64,12 +68,24 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
Transport: &transport,
}

if c.Timeout != nil {
writer.WriteTimeout = *c.Timeout
} else {
writer.WriteTimeout = 45 * time.Second
}

n := &Notifier{
conf: c,
logger: l,
writer: writer,
}

if c.NumberOfPartition != nil {
n.numberOfPartition = *c.NumberOfPartition
} else {
n.numberOfPartition = 1
}

return n, nil
}

Expand All @@ -81,7 +97,7 @@ func (n *Notifier) GetPartitionIndex() int {
// NextPartition returns the next partition index.
func (n *Notifier) NextPartition() {
n.partitionIndexMutex.Lock()
n.partitionIndex = (n.partitionIndex + 1) % n.conf.NumberOfPartition
n.partitionIndex = (n.partitionIndex + 1) % n.numberOfPartition
n.partitionIndexMutex.Unlock()
}

Expand Down

0 comments on commit d7e8607

Please sign in to comment.