-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathinput_kafka.go
183 lines (145 loc) · 4.08 KB
/
input_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package goreplay
import (
"encoding/json"
"log"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
)
// KafkaInput is used for receiving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
config *InputKafkaConfig
consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
speedFactor float64
quit chan struct{}
kafkaTimer *kafkaTimer
}
func getOffsetOfPartitions(offsetCfg string) int64 {
offset, err := strconv.ParseInt(offsetCfg, 10, 64)
if err != nil || offset < -2 {
log.Fatalln("Failed to parse offset: "+offsetCfg, err)
}
return offset
}
// NewKafkaInput creates instance of kafka consumer client with TLS config
func NewKafkaInput(offsetCfg string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
c := NewKafkaConfig(&config.SASLConfig, tlsConfig)
var con sarama.Consumer
if mock, ok := config.consumer.(*mocks.Consumer); ok && mock != nil {
con = config.consumer
} else {
var err error
con, err = sarama.NewConsumer(strings.Split(config.Host, ","), c)
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) consumer:", err)
}
}
partitions, err := con.Partitions(config.Topic)
if err != nil {
log.Fatalln("Failed to collect Sarama(Kafka) partitions:", err)
}
i := &KafkaInput{
config: config,
consumers: make([]sarama.PartitionConsumer, len(partitions)),
messages: make(chan *sarama.ConsumerMessage, 256),
speedFactor: 1,
quit: make(chan struct{}),
kafkaTimer: new(kafkaTimer),
}
i.config.Offset = offsetCfg
for index, partition := range partitions {
consumer, err := con.ConsumePartition(config.Topic, partition, getOffsetOfPartitions(offsetCfg))
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) partition consumer:", err)
}
go func(consumer sarama.PartitionConsumer) {
defer consumer.Close()
for message := range consumer.Messages() {
i.messages <- message
}
}(consumer)
go i.ErrorHandler(consumer)
i.consumers[index] = consumer
}
return i
}
// ErrorHandler should receive errors
func (i *KafkaInput) ErrorHandler(consumer sarama.PartitionConsumer) {
for err := range consumer.Errors() {
Debug(1, "Failed to read access log entry:", err)
}
}
// PluginRead a reads message from this plugin
func (i *KafkaInput) PluginRead() (*Message, error) {
var message *sarama.ConsumerMessage
var msg Message
select {
case <-i.quit:
return nil, ErrorStopped
case message = <-i.messages:
}
inputTs := ""
msg.Data = message.Value
if i.config.UseJSON {
var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)
inputTs = kafkaMessage.ReqTs
var err error
msg.Data, err = kafkaMessage.Dump()
if err != nil {
Debug(1, "[INPUT-KAFKA] failed to decode access log entry:", err)
return nil, err
}
}
// does it have meta
if isOriginPayload(msg.Data) {
msg.Meta, msg.Data = payloadMetaWithBody(msg.Data)
inputTs = string(payloadMeta(msg.Meta)[2])
}
i.timeWait(inputTs)
return &msg, nil
}
func (i *KafkaInput) String() string {
return "Kafka Input: " + i.config.Host + "/" + i.config.Topic
}
// Close closes this plugin
func (i *KafkaInput) Close() error {
close(i.quit)
return nil
}
func (i *KafkaInput) timeWait(curInputTs string) {
if i.config.Offset == "-1" || curInputTs == "" {
return
}
// implement for Kafka input showdown or speedup emitting
timer := i.kafkaTimer
curTs := time.Now().UnixNano()
curInput, err := strconv.ParseInt(curInputTs, 10, 64)
if timer.latestInputTs == 0 || timer.latestOutputTs == 0 {
timer.latestInputTs = curInput
timer.latestOutputTs = curTs
return
}
if err != nil {
log.Fatalln("Fatal to parse timestamp err: ", err)
}
diffTs := curInput - timer.latestInputTs
pastTs := curTs - timer.latestOutputTs
diff := diffTs - pastTs
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}
if diff > 0 {
time.Sleep(time.Duration(diff))
}
timer.latestInputTs = curInput
timer.latestOutputTs = curTs
}
type kafkaTimer struct {
latestInputTs int64
latestOutputTs int64
}