This repository has been archived by the owner on Jul 28, 2020. It is now read-only.
forked from ThreeDotsLabs/watermill-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.go
134 lines (104 loc) · 2.88 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package kafka
import (
"time"
"github.com/Shopify/sarama"
"github.com/pkg/errors"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type Publisher struct {
config PublisherConfig
producer sarama.SyncProducer
logger watermill.LoggerAdapter
closed bool
}
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (message.Publisher, error) {
config.setDefaults()
if err := config.Validate(); err != nil {
return nil, err
}
if logger == nil {
logger = watermill.NopLogger{}
}
producer, err := sarama.NewSyncProducer(config.Brokers, config.OverwriteSaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "cannot create Kafka producer")
}
return &Publisher{
config: config,
producer: producer,
logger: logger,
}, nil
}
type PublisherConfig struct {
// Kafka brokers list.
Brokers []string
// Marshaler is used to marshal messages from Watermill format into Kafka format.
Marshaler Marshaler
// OverwriteSaramaConfig holds additional sarama settings.
OverwriteSaramaConfig *sarama.Config
}
func (c *PublisherConfig) setDefaults() {
if c.OverwriteSaramaConfig == nil {
c.OverwriteSaramaConfig = DefaultSaramaSyncPublisherConfig()
}
}
func (c PublisherConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("missing brokers")
}
if c.Marshaler == nil {
return errors.New("missing marshaler")
}
return nil
}
func DefaultSaramaSyncPublisherConfig() *sarama.Config {
config := sarama.NewConfig()
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
config.Version = sarama.V1_0_0_0
config.Metadata.Retry.Backoff = time.Second * 2
config.ClientID = "watermill"
return config
}
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}
logFields := make(watermill.LogFields, 4)
logFields["topic"] = topic
for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Sending message to Kafka", logFields)
kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}
partition, offset, err := p.producer.SendMessage(kafkaMsg)
if err != nil {
return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
}
logFields["kafka_partition"] = partition
logFields["kafka_partition_offset"] = offset
p.logger.Trace("Message sent to Kafka", logFields)
}
return nil
}
func (p *Publisher) Close() error {
if p.closed {
return nil
}
p.closed = true
if err := p.producer.Close(); err != nil {
return errors.Wrap(err, "cannot close Kafka producer")
}
return nil
}