4
4
"context"
5
5
"crypto/tls"
6
6
"crypto/x509"
7
+ "errors"
7
8
"io"
8
9
"log"
9
10
"os"
@@ -14,12 +15,16 @@ import (
14
15
_ "github.com/segmentio/kafka-go/lz4"
15
16
"github.com/segmentio/kafka-go/sasl/plain"
16
17
_ "github.com/segmentio/kafka-go/snappy"
18
+ "github.com/zeromicro/go-queue/kq/internal"
19
+ "github.com/zeromicro/go-zero/core/contextx"
20
+ "github.com/zeromicro/go-zero/core/logc"
17
21
"github.com/zeromicro/go-zero/core/logx"
18
22
"github.com/zeromicro/go-zero/core/queue"
19
23
"github.com/zeromicro/go-zero/core/service"
20
24
"github.com/zeromicro/go-zero/core/stat"
21
25
"github.com/zeromicro/go-zero/core/threading"
22
26
"github.com/zeromicro/go-zero/core/timex"
27
+ "go.opentelemetry.io/otel"
23
28
)
24
29
25
30
const (
@@ -29,12 +34,12 @@ const (
29
34
)
30
35
31
36
type (
32
- ConsumeHandle func (key , value string ) error
37
+ ConsumeHandle func (ctx context. Context , key , value string ) error
33
38
34
- ConsumeErrorHandler func (msg kafka.Message , err error )
39
+ ConsumeErrorHandler func (ctx context. Context , msg kafka.Message , err error )
35
40
36
41
ConsumeHandler interface {
37
- Consume (key , value string ) error
42
+ Consume (ctx context. Context , key , value string ) error
38
43
}
39
44
40
45
queueOptions struct {
54
59
channel chan kafka.Message
55
60
producerRoutines * threading.RoutineGroup
56
61
consumerRoutines * threading.RoutineGroup
62
+ commitRunner * threading.StableRunner [kafka.Message , kafka.Message ]
57
63
metrics * stat.Metrics
58
64
errorHandler ConsumeErrorHandler
59
65
}
@@ -143,7 +149,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
143
149
}
144
150
consumer := kafka .NewReader (readerConfig )
145
151
146
- return & kafkaQueue {
152
+ q := & kafkaQueue {
147
153
c : c ,
148
154
consumer : consumer ,
149
155
handler : handler ,
@@ -153,25 +159,51 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
153
159
metrics : options .metrics ,
154
160
errorHandler : options .errorHandler ,
155
161
}
162
+ if c .CommitInOrder {
163
+ q .commitRunner = threading .NewStableRunner (func (msg kafka.Message ) kafka.Message {
164
+ if err := q .consumeOne (context .Background (), string (msg .Key ), string (msg .Value )); err != nil {
165
+ if q .errorHandler != nil {
166
+ q .errorHandler (context .Background (), msg , err )
167
+ }
168
+ }
169
+
170
+ return msg
171
+ })
172
+ }
173
+
174
+ return q
156
175
}
157
176
158
177
func (q * kafkaQueue ) Start () {
159
- q . startConsumers ()
160
- q . startProducers ()
178
+ if q . c . CommitInOrder {
179
+ go q . commitInOrder ()
161
180
162
- q .producerRoutines .Wait ()
163
- close (q .channel )
164
- q .consumerRoutines .Wait ()
181
+ if err := q .consume (func (msg kafka.Message ) {
182
+ if e := q .commitRunner .Push (msg ); e != nil {
183
+ logx .Error (e )
184
+ }
185
+ }); err != nil {
186
+ logx .Error (err )
187
+ }
188
+ } else {
189
+ q .startConsumers ()
190
+ q .startProducers ()
191
+ q .producerRoutines .Wait ()
192
+ close (q .channel )
193
+ q .consumerRoutines .Wait ()
194
+
195
+ logx .Infof ("Consumer %s is closed" , q .c .Name )
196
+ }
165
197
}
166
198
167
199
func (q * kafkaQueue ) Stop () {
168
200
q .consumer .Close ()
169
201
logx .Close ()
170
202
}
171
203
172
- func (q * kafkaQueue ) consumeOne (key , val string ) error {
204
+ func (q * kafkaQueue ) consumeOne (ctx context. Context , key , val string ) error {
173
205
startTime := timex .Now ()
174
- err := q .handler .Consume (key , val )
206
+ err := q .handler .Consume (ctx , key , val )
175
207
q .metrics .Add (stat.Task {
176
208
Duration : timex .Since (startTime ),
177
209
})
@@ -182,18 +214,25 @@ func (q *kafkaQueue) startConsumers() {
182
214
for i := 0 ; i < q .c .Processors ; i ++ {
183
215
q .consumerRoutines .Run (func () {
184
216
for msg := range q .channel {
185
- if err := q .consumeOne (string (msg .Key ), string (msg .Value )); err != nil {
217
+ // wrap message into message carrier
218
+ mc := internal .NewMessageCarrier (internal .NewMessage (& msg ))
219
+ // extract trace context from message
220
+ ctx := otel .GetTextMapPropagator ().Extract (context .Background (), mc )
221
+ // remove deadline and error control
222
+ ctx = contextx .ValueOnlyFrom (ctx )
223
+
224
+ if err := q .consumeOne (ctx , string (msg .Key ), string (msg .Value )); err != nil {
186
225
if q .errorHandler != nil {
187
- q .errorHandler (msg , err )
226
+ q .errorHandler (ctx , msg , err )
188
227
}
189
228
190
229
if ! q .c .ForceCommit {
191
230
continue
192
231
}
193
232
}
194
233
195
- if err := q .consumer .CommitMessages (context . Background () , msg ); err != nil {
196
- logx .Errorf ("commit failed, error: %v" , err )
234
+ if err := q .consumer .CommitMessages (ctx , msg ); err != nil {
235
+ logc .Errorf (ctx , "commit failed, error: %v" , err )
197
236
}
198
237
}
199
238
})
@@ -202,25 +241,50 @@ func (q *kafkaQueue) startConsumers() {
202
241
203
242
func (q * kafkaQueue ) startProducers () {
204
243
for i := 0 ; i < q .c .Consumers ; i ++ {
244
+ i := i
205
245
q .producerRoutines .Run (func () {
206
- for {
207
- msg , err := q .consumer .FetchMessage (context .Background ())
208
- // io.EOF means consumer closed
209
- // io.ErrClosedPipe means committing messages on the consumer,
210
- // kafka will refire the messages on uncommitted messages, ignore
211
- if err == io .EOF || err == io .ErrClosedPipe {
212
- return
213
- }
214
- if err != nil {
215
- logx .Errorf ("Error on reading message, %q" , err .Error ())
216
- continue
217
- }
246
+ if err := q .consume (func (msg kafka.Message ) {
218
247
q .channel <- msg
248
+ }); err != nil {
249
+ logx .Infof ("Consumer %s-%d is closed, error: %q" , q .c .Name , i , err .Error ())
250
+ return
219
251
}
220
252
})
221
253
}
222
254
}
223
255
256
+ func (q * kafkaQueue ) consume (handle func (msg kafka.Message )) error {
257
+ for {
258
+ msg , err := q .consumer .FetchMessage (context .Background ())
259
+ // io.EOF means consumer closed
260
+ // io.ErrClosedPipe means committing messages on the consumer,
261
+ // kafka will refire the messages on uncommitted messages, ignore
262
+ if err == io .EOF || errors .Is (err , io .ErrClosedPipe ) {
263
+ return err
264
+ }
265
+ if err != nil {
266
+ logx .Errorf ("Error on reading message, %q" , err .Error ())
267
+ continue
268
+ }
269
+
270
+ handle (msg )
271
+ }
272
+ }
273
+
274
+ func (q * kafkaQueue ) commitInOrder () {
275
+ for {
276
+ msg , err := q .commitRunner .Get ()
277
+ if err != nil {
278
+ logx .Error (err )
279
+ return
280
+ }
281
+
282
+ if err := q .consumer .CommitMessages (context .Background (), msg ); err != nil {
283
+ logx .Errorf ("commit failed, error: %v" , err )
284
+ }
285
+ }
286
+ }
287
+
224
288
func (q kafkaQueues ) Start () {
225
289
for _ , each := range q .queues {
226
290
q .group .Add (each )
@@ -272,8 +336,8 @@ type innerConsumeHandler struct {
272
336
handle ConsumeHandle
273
337
}
274
338
275
- func (ch innerConsumeHandler ) Consume (k , v string ) error {
276
- return ch .handle (k , v )
339
+ func (ch innerConsumeHandler ) Consume (ctx context. Context , k , v string ) error {
340
+ return ch .handle (ctx , k , v )
277
341
}
278
342
279
343
func ensureQueueOptions (c KqConf , options * queueOptions ) {
@@ -290,8 +354,8 @@ func ensureQueueOptions(c KqConf, options *queueOptions) {
290
354
options .metrics = stat .NewMetrics (c .Name )
291
355
}
292
356
if options .errorHandler == nil {
293
- options .errorHandler = func (msg kafka.Message , err error ) {
294
- logx .Errorf ("consume: %s, error: %v" , string (msg .Value ), err )
357
+ options .errorHandler = func (ctx context. Context , msg kafka.Message , err error ) {
358
+ logc .Errorf (ctx , "consume: %s, error: %v" , string (msg .Value ), err )
295
359
}
296
360
}
297
361
}
0 commit comments