@@ -195,12 +195,10 @@ func (k *KinesisSource) decodeFromSubscription(record []byte) ([]CloudwatchSubsc
195
195
return subscriptionRecord .LogEvents , nil
196
196
}
197
197
198
- func (k * KinesisSource ) WaitForConsumerDeregistration (consumerName string , streamARN string ) error {
198
+ func (k * KinesisSource ) WaitForConsumerDeregistration (ctx context. Context , consumerName string , streamARN string ) error {
199
199
maxTries := k .Config .MaxRetries
200
200
for i := range maxTries {
201
- _ , err := k .kClient .DescribeStreamConsumer (
202
- context .TODO (),
203
- & kinesis.DescribeStreamConsumerInput {
201
+ _ , err := k .kClient .DescribeStreamConsumer (ctx , & kinesis.DescribeStreamConsumerInput {
204
202
ConsumerName : aws .String (consumerName ),
205
203
StreamARN : aws .String (streamARN ),
206
204
})
@@ -221,11 +219,9 @@ func (k *KinesisSource) WaitForConsumerDeregistration(consumerName string, strea
221
219
return fmt .Errorf ("consumer %s is not deregistered after %d tries" , consumerName , maxTries )
222
220
}
223
221
224
- func (k * KinesisSource ) DeregisterConsumer () error {
222
+ func (k * KinesisSource ) DeregisterConsumer (ctx context. Context ) error {
225
223
k .logger .Debugf ("Deregistering consumer %s if it exists" , k .Config .ConsumerName )
226
- _ , err := k .kClient .DeregisterStreamConsumer (
227
- context .TODO (),
228
- & kinesis.DeregisterStreamConsumerInput {
224
+ _ , err := k .kClient .DeregisterStreamConsumer (ctx , & kinesis.DeregisterStreamConsumerInput {
229
225
ConsumerName : aws .String (k .Config .ConsumerName ),
230
226
StreamARN : aws .String (k .Config .StreamARN ),
231
227
})
@@ -239,20 +235,18 @@ func (k *KinesisSource) DeregisterConsumer() error {
239
235
return fmt .Errorf ("cannot deregister stream consumer: %w" , err )
240
236
}
241
237
242
- err = k .WaitForConsumerDeregistration (k .Config .ConsumerName , k .Config .StreamARN )
238
+ err = k .WaitForConsumerDeregistration (ctx , k .Config .ConsumerName , k .Config .StreamARN )
243
239
if err != nil {
244
240
return fmt .Errorf ("cannot wait for consumer deregistration: %w" , err )
245
241
}
246
242
247
243
return nil
248
244
}
249
245
250
- func (k * KinesisSource ) WaitForConsumerRegistration (consumerARN string ) error {
246
+ func (k * KinesisSource ) WaitForConsumerRegistration (ctx context. Context , consumerARN string ) error {
251
247
maxTries := k .Config .MaxRetries
252
248
for i := range maxTries {
253
- describeOutput , err := k .kClient .DescribeStreamConsumer (
254
- context .TODO (),
255
- & kinesis.DescribeStreamConsumerInput {
249
+ describeOutput , err := k .kClient .DescribeStreamConsumer (ctx , & kinesis.DescribeStreamConsumerInput {
256
250
ConsumerARN : aws .String (consumerARN ),
257
251
})
258
252
if err != nil {
@@ -271,20 +265,18 @@ func (k *KinesisSource) WaitForConsumerRegistration(consumerARN string) error {
271
265
return fmt .Errorf ("consumer %s is not active after %d tries" , consumerARN , maxTries )
272
266
}
273
267
274
- func (k * KinesisSource ) RegisterConsumer () (* kinesis.RegisterStreamConsumerOutput , error ) {
268
+ func (k * KinesisSource ) RegisterConsumer (ctx context. Context ) (* kinesis.RegisterStreamConsumerOutput , error ) {
275
269
k .logger .Debugf ("Registering consumer %s" , k .Config .ConsumerName )
276
270
277
- streamConsumer , err := k .kClient .RegisterStreamConsumer (
278
- context .TODO (),
279
- & kinesis.RegisterStreamConsumerInput {
271
+ streamConsumer , err := k .kClient .RegisterStreamConsumer (ctx , & kinesis.RegisterStreamConsumerInput {
280
272
ConsumerName : aws .String (k .Config .ConsumerName ),
281
273
StreamARN : aws .String (k .Config .StreamARN ),
282
274
})
283
275
if err != nil {
284
276
return nil , fmt .Errorf ("cannot register stream consumer: %w" , err )
285
277
}
286
278
287
- err = k .WaitForConsumerRegistration (* streamConsumer .Consumer .ConsumerARN )
279
+ err = k .WaitForConsumerRegistration (ctx , * streamConsumer .Consumer .ConsumerARN )
288
280
if err != nil {
289
281
return nil , fmt .Errorf ("timeout while waiting for consumer to be active: %w" , err )
290
282
}
@@ -378,10 +370,8 @@ func (k *KinesisSource) ReadFromSubscription(reader kinesis.SubscribeToShardEven
378
370
}
379
371
}
380
372
381
- func (k * KinesisSource ) SubscribeToShards (arn arn.ARN , streamConsumer * kinesis.RegisterStreamConsumerOutput , out chan types.Event ) error {
382
- shards , err := k .kClient .ListShards (
383
- context .TODO (),
384
- & kinesis.ListShardsInput {
373
+ func (k * KinesisSource ) SubscribeToShards (ctx context.Context , arn arn.ARN , streamConsumer * kinesis.RegisterStreamConsumerOutput , out chan types.Event ) error {
374
+ shards , err := k .kClient .ListShards (ctx , & kinesis.ListShardsInput {
385
375
StreamName : aws .String (arn .Resource [7 :]),
386
376
})
387
377
if err != nil {
@@ -391,9 +381,7 @@ func (k *KinesisSource) SubscribeToShards(arn arn.ARN, streamConsumer *kinesis.R
391
381
for _ , shard := range shards .Shards {
392
382
shardID := * shard .ShardId
393
383
394
- r , err := k .kClient .SubscribeToShard (
395
- context .TODO (),
396
- & kinesis.SubscribeToShardInput {
384
+ r , err := k .kClient .SubscribeToShard (ctx , & kinesis.SubscribeToShardInput {
397
385
ShardId : aws .String (shardID ),
398
386
StartingPosition : & kinTypes.StartingPosition {Type : kinTypes .ShardIteratorTypeLatest },
399
387
ConsumerARN : streamConsumer .Consumer .ConsumerARN ,
@@ -410,7 +398,7 @@ func (k *KinesisSource) SubscribeToShards(arn arn.ARN, streamConsumer *kinesis.R
410
398
return nil
411
399
}
412
400
413
- func (k * KinesisSource ) EnhancedRead (out chan types.Event , t * tomb.Tomb ) error {
401
+ func (k * KinesisSource ) EnhancedRead (ctx context. Context , out chan types.Event , t * tomb.Tomb ) error {
414
402
parsedARN , err := arn .Parse (k .Config .StreamARN )
415
403
if err != nil {
416
404
return fmt .Errorf ("cannot parse stream ARN: %w" , err )
@@ -423,20 +411,20 @@ func (k *KinesisSource) EnhancedRead(out chan types.Event, t *tomb.Tomb) error {
423
411
k .logger = k .logger .WithField ("stream" , parsedARN .Resource [7 :])
424
412
k .logger .Info ("starting kinesis acquisition with enhanced fan-out" )
425
413
426
- err = k .DeregisterConsumer ()
414
+ err = k .DeregisterConsumer (ctx )
427
415
if err != nil {
428
416
return fmt .Errorf ("cannot deregister consumer: %w" , err )
429
417
}
430
418
431
- streamConsumer , err := k .RegisterConsumer ()
419
+ streamConsumer , err := k .RegisterConsumer (ctx )
432
420
if err != nil {
433
421
return fmt .Errorf ("cannot register consumer: %w" , err )
434
422
}
435
423
436
424
for {
437
425
k .shardReaderTomb = & tomb.Tomb {}
438
426
439
- err = k .SubscribeToShards (parsedARN , streamConsumer , out )
427
+ err = k .SubscribeToShards (ctx , parsedARN , streamConsumer , out )
440
428
if err != nil {
441
429
return fmt .Errorf ("cannot subscribe to shards: %w" , err )
442
430
}
@@ -446,7 +434,7 @@ func (k *KinesisSource) EnhancedRead(out chan types.Event, t *tomb.Tomb) error {
446
434
k .shardReaderTomb .Kill (nil )
447
435
_ = k .shardReaderTomb .Wait () // we don't care about the error as we kill the tomb ourselves
448
436
449
- err = k .DeregisterConsumer ()
437
+ err = k .DeregisterConsumer (ctx )
450
438
if err != nil {
451
439
return fmt .Errorf ("cannot deregister consumer: %w" , err )
452
440
}
@@ -466,12 +454,11 @@ func (k *KinesisSource) EnhancedRead(out chan types.Event, t *tomb.Tomb) error {
466
454
}
467
455
}
468
456
469
- func (k * KinesisSource ) ReadFromShard (out chan types.Event , shardID string ) error {
457
+ func (k * KinesisSource ) ReadFromShard (ctx context. Context , out chan types.Event , shardID string ) error {
470
458
logger := k .logger .WithField ("shard" , shardID )
471
459
logger .Debugf ("Starting to read shard" )
472
460
473
- sharIt , err := k .kClient .GetShardIterator (
474
- context .TODO (),
461
+ sharIt , err := k .kClient .GetShardIterator (ctx ,
475
462
& kinesis.GetShardIteratorInput {
476
463
ShardId : aws .String (shardID ),
477
464
StreamName : & k .Config .StreamName ,
@@ -489,7 +476,7 @@ func (k *KinesisSource) ReadFromShard(out chan types.Event, shardID string) erro
489
476
for {
490
477
select {
491
478
case <- ticker .C :
492
- records , err := k .kClient .GetRecords (context . TODO () , & kinesis.GetRecordsInput {ShardIterator : it })
479
+ records , err := k .kClient .GetRecords (ctx , & kinesis.GetRecordsInput {ShardIterator : it })
493
480
it = records .NextShardIterator
494
481
495
482
var throughputErr * kinTypes.ProvisionedThroughputExceededException
@@ -526,14 +513,12 @@ func (k *KinesisSource) ReadFromShard(out chan types.Event, shardID string) erro
526
513
}
527
514
}
528
515
529
- func (k * KinesisSource ) ReadFromStream (out chan types.Event , t * tomb.Tomb ) error {
516
+ func (k * KinesisSource ) ReadFromStream (ctx context. Context , out chan types.Event , t * tomb.Tomb ) error {
530
517
k .logger = k .logger .WithField ("stream" , k .Config .StreamName )
531
518
k .logger .Info ("starting kinesis acquisition from shards" )
532
519
533
520
for {
534
- shards , err := k .kClient .ListShards (
535
- context .TODO (),
536
- & kinesis.ListShardsInput {
521
+ shards , err := k .kClient .ListShards (ctx , & kinesis.ListShardsInput {
537
522
StreamName : aws .String (k .Config .StreamName ),
538
523
})
539
524
if err != nil {
@@ -547,7 +532,7 @@ func (k *KinesisSource) ReadFromStream(out chan types.Event, t *tomb.Tomb) error
547
532
548
533
k .shardReaderTomb .Go (func () error {
549
534
defer trace .CatchPanic ("crowdsec/acquis/kinesis/streaming/shard" )
550
- return k .ReadFromShard (out , shardID )
535
+ return k .ReadFromShard (ctx , out , shardID )
551
536
})
552
537
}
553
538
select {
@@ -571,15 +556,15 @@ func (k *KinesisSource) ReadFromStream(out chan types.Event, t *tomb.Tomb) error
571
556
}
572
557
}
573
558
574
- func (k * KinesisSource ) StreamingAcquisition (_ context.Context , out chan types.Event , t * tomb.Tomb ) error {
559
+ func (k * KinesisSource ) StreamingAcquisition (ctx context.Context , out chan types.Event , t * tomb.Tomb ) error {
575
560
t .Go (func () error {
576
561
defer trace .CatchPanic ("crowdsec/acquis/kinesis/streaming" )
577
562
578
563
if k .Config .UseEnhancedFanOut {
579
- return k .EnhancedRead (out , t )
564
+ return k .EnhancedRead (ctx , out , t )
580
565
}
581
566
582
- return k .ReadFromStream (out , t )
567
+ return k .ReadFromStream (ctx , out , t )
583
568
})
584
569
585
570
return nil
0 commit comments