This repository has been archived by the owner on Jul 28, 2020. It is now read-only.
forked from ThreeDotsLabs/watermill-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmarshaler.go
94 lines (74 loc) · 2.43 KB
/
marshaler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package kafka
import (
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)
const UUIDHeaderKey = "_watermill_message_uuid"
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}
// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}
headers := []sarama.RecordHeader{{
Key: []byte(UUIDHeaderKey),
Value: []byte(msg.UUID),
}}
for key, value := range msg.Metadata {
headers = append(headers, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
return &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(msg.Payload),
Headers: headers,
}, nil
}
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) {
var messageID string
metadata := make(message.Metadata, len(kafkaMsg.Headers))
for _, header := range kafkaMsg.Headers {
if string(header.Key) == UUIDHeaderKey {
messageID = string(header.Value)
} else {
metadata.Set(string(header.Key), string(header.Value))
}
}
msg := message.NewMessage(messageID, kafkaMsg.Value)
msg.Metadata = metadata
return msg, nil
}
type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)
type kafkaJsonWithPartitioning struct {
DefaultMarshaler
generatePartitionKey GeneratePartitionKey
}
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}
func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
kafkaMsg, err := j.DefaultMarshaler.Marshal(topic, msg)
if err != nil {
return nil, err
}
key, err := j.generatePartitionKey(topic, msg)
if err != nil {
return nil, errors.Wrap(err, "cannot generate partition key")
}
kafkaMsg.Key = sarama.ByteEncoder(key)
return kafkaMsg, nil
}