Skip to content

Commit bb57fd4

Browse files
committed
wip
1 parent e65bb44 commit bb57fd4

File tree

5 files changed

+106
-82
lines changed

5 files changed

+106
-82
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ require (
1414
github.com/aws/aws-sdk-go v1.52.0
1515
github.com/aws/aws-sdk-go-v2 v1.38.3
1616
github.com/aws/aws-sdk-go-v2/config v1.31.6
17+
github.com/aws/aws-sdk-go-v2/credentials v1.18.10 // indirect
1718
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.19.4
19+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.40.1
1820
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.3
1921
github.com/aws/aws-sdk-go-v2/service/sqs v1.42.3
2022
github.com/beevik/etree v1.4.1
@@ -107,7 +109,6 @@ require (
107109
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
108110
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
109111
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 // indirect
110-
github.com/aws/aws-sdk-go-v2/credentials v1.18.10 // indirect
111112
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.6 // indirect
112113
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.6 // indirect
113114
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.6 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.6 h1:LHS1YAIJX
7676
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.6/go.mod h1:c9PCiTEuh0wQID5/KqA32J+HAgZxN9tOGXKCiYJjTZI=
7777
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.6 h1:nEXUSAwyUfLTgnc9cxlDWy637qsq4UWwp3sNAfl0Z3Y=
7878
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.6/go.mod h1:HGzIULx4Ge3Do2V0FaiYKcyKzOqwrhUZgCI77NisswQ=
79+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.40.1 h1:9QC0AF6gakV1TZuGp3NEUNl/6gXt3rfIifnxd+dWwbw=
80+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.40.1/go.mod h1:UpSQbmXxFiDGDrvqsTgEm3YijDf9cg/Ti+s2W0SeFEU=
7981
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.3 h1:ETkfWcXP2KNPLecaDa++5bsQhCRa5M5sLUJa5DWYIIg=
8082
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.3/go.mod h1:+/3ZTqoYb3Ur7DObD00tarKMLMuKg8iqz5CHEanqTnw=
8183
github.com/aws/aws-sdk-go-v2/service/sqs v1.42.3 h1:0dWg1Tkz3FnEo48DgAh7CT22hYyMShly8WMd3sGx0xI=

pkg/acquisition/modules/kinesis/kinesis.go

Lines changed: 86 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import (
1111
"strings"
1212
"time"
1313

14-
"github.com/aws/aws-sdk-go/aws"
15-
"github.com/aws/aws-sdk-go/aws/arn"
16-
"github.com/aws/aws-sdk-go/aws/session"
17-
"github.com/aws/aws-sdk-go/service/kinesis"
14+
"github.com/aws/aws-sdk-go-v2/aws"
15+
"github.com/aws/aws-sdk-go-v2/aws/arn"
16+
"github.com/aws/aws-sdk-go-v2/config"
17+
"github.com/aws/aws-sdk-go-v2/service/kinesis"
18+
kinTypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
19+
1820
yaml "github.com/goccy/go-yaml"
1921
"github.com/prometheus/client_golang/prometheus"
2022
log "github.com/sirupsen/logrus"
@@ -45,7 +47,7 @@ type KinesisSource struct {
4547
metricsLevel metrics.AcquisitionMetricsLevel
4648
Config KinesisConfiguration
4749
logger *log.Entry
48-
kClient *kinesis.Kinesis
50+
kClient *kinesis.Client
4951
shardReaderTomb *tomb.Tomb
5052
}
5153

@@ -68,39 +70,32 @@ func (k *KinesisSource) GetUuid() string {
6870
return k.Config.UniqueId
6971
}
7072

71-
func (k *KinesisSource) newClient() error {
72-
var sess *session.Session
73-
74-
if k.Config.AwsProfile != nil {
75-
sess = session.Must(session.NewSessionWithOptions(session.Options{
76-
SharedConfigState: session.SharedConfigEnable,
77-
Profile: *k.Config.AwsProfile,
78-
}))
79-
} else {
80-
sess = session.Must(session.NewSessionWithOptions(session.Options{
81-
SharedConfigState: session.SharedConfigEnable,
82-
}))
73+
func (k *KinesisSource) newClient(ctx context.Context) error {
74+
var loadOpts []func(*config.LoadOptions) error
75+
if k.Config.AwsProfile != nil && *k.Config.AwsProfile != "" {
76+
loadOpts = append(loadOpts, config.WithSharedConfigProfile(*k.Config.AwsProfile))
8377
}
84-
85-
if sess == nil {
86-
return errors.New("failed to create aws session")
78+
region := k.Config.AwsRegion
79+
if region == "" {
80+
region = "us-east-1"
8781
}
82+
loadOpts = append(loadOpts, config.WithRegion(region))
83+
loadOpts = append(loadOpts, config.WithCredentialsProvider(aws.AnonymousCredentials{}))
8884

89-
config := aws.NewConfig()
90-
91-
if k.Config.AwsRegion != "" {
92-
config = config.WithRegion(k.Config.AwsRegion)
85+
cfg, err := config.LoadDefaultConfig(ctx, loadOpts...)
86+
if err != nil {
87+
return fmt.Errorf("failed to load aws config: %w", err)
9388
}
9489

90+
var clientOpts []func(*kinesis.Options)
9591
if k.Config.AwsEndpoint != "" {
96-
config = config.WithEndpoint(k.Config.AwsEndpoint)
92+
clientOpts = append(clientOpts, func(o *kinesis.Options) {
93+
o.BaseEndpoint = aws.String(k.Config.AwsEndpoint)
94+
})
9795
}
9896

99-
k.kClient = kinesis.New(sess, config)
100-
if k.kClient == nil {
101-
return errors.New("failed to create kinesis client")
102-
}
10397

98+
k.kClient = kinesis.NewFromConfig(cfg, clientOpts...)
10499
return nil
105100
}
106101

@@ -156,7 +151,7 @@ func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, metricsL
156151
return err
157152
}
158153

159-
err = k.newClient()
154+
err = k.newClient(context.TODO())
160155
if err != nil {
161156
return fmt.Errorf("cannot create kinesis client: %w", err)
162157
}
@@ -203,12 +198,14 @@ func (k *KinesisSource) decodeFromSubscription(record []byte) ([]CloudwatchSubsc
203198
func (k *KinesisSource) WaitForConsumerDeregistration(consumerName string, streamARN string) error {
204199
maxTries := k.Config.MaxRetries
205200
for i := range maxTries {
206-
_, err := k.kClient.DescribeStreamConsumer(&kinesis.DescribeStreamConsumerInput{
207-
ConsumerName: aws.String(consumerName),
208-
StreamARN: aws.String(streamARN),
209-
})
201+
_, err := k.kClient.DescribeStreamConsumer(
202+
context.TODO(),
203+
&kinesis.DescribeStreamConsumerInput{
204+
ConsumerName: aws.String(consumerName),
205+
StreamARN: aws.String(streamARN),
206+
})
210207

211-
var resourceNotFoundErr *kinesis.ResourceNotFoundException
208+
var resourceNotFoundErr *kinTypes.ResourceNotFoundException
212209
if errors.As(err, &resourceNotFoundErr) {
213210
return nil
214211
}
@@ -226,12 +223,14 @@ func (k *KinesisSource) WaitForConsumerDeregistration(consumerName string, strea
226223

227224
func (k *KinesisSource) DeregisterConsumer() error {
228225
k.logger.Debugf("Deregistering consumer %s if it exists", k.Config.ConsumerName)
229-
_, err := k.kClient.DeregisterStreamConsumer(&kinesis.DeregisterStreamConsumerInput{
230-
ConsumerName: aws.String(k.Config.ConsumerName),
231-
StreamARN: aws.String(k.Config.StreamARN),
232-
})
226+
_, err := k.kClient.DeregisterStreamConsumer(
227+
context.TODO(),
228+
&kinesis.DeregisterStreamConsumerInput{
229+
ConsumerName: aws.String(k.Config.ConsumerName),
230+
StreamARN: aws.String(k.Config.StreamARN),
231+
})
233232

234-
var resourceNotFoundErr *kinesis.ResourceNotFoundException
233+
var resourceNotFoundErr *kinTypes.ResourceNotFoundException
235234
if errors.As(err, &resourceNotFoundErr) {
236235
return nil
237236
}
@@ -251,14 +250,16 @@ func (k *KinesisSource) DeregisterConsumer() error {
251250
func (k *KinesisSource) WaitForConsumerRegistration(consumerARN string) error {
252251
maxTries := k.Config.MaxRetries
253252
for i := range maxTries {
254-
describeOutput, err := k.kClient.DescribeStreamConsumer(&kinesis.DescribeStreamConsumerInput{
255-
ConsumerARN: aws.String(consumerARN),
256-
})
253+
describeOutput, err := k.kClient.DescribeStreamConsumer(
254+
context.TODO(),
255+
&kinesis.DescribeStreamConsumerInput{
256+
ConsumerARN: aws.String(consumerARN),
257+
})
257258
if err != nil {
258259
return fmt.Errorf("cannot describe stream consumer: %w", err)
259260
}
260261

261-
if *describeOutput.ConsumerDescription.ConsumerStatus == "ACTIVE" {
262+
if describeOutput.ConsumerDescription.ConsumerStatus == "ACTIVE" {
262263
k.logger.Debugf("Consumer %s is active", consumerARN)
263264
return nil
264265
}
@@ -273,10 +274,12 @@ func (k *KinesisSource) WaitForConsumerRegistration(consumerARN string) error {
273274
func (k *KinesisSource) RegisterConsumer() (*kinesis.RegisterStreamConsumerOutput, error) {
274275
k.logger.Debugf("Registering consumer %s", k.Config.ConsumerName)
275276

276-
streamConsumer, err := k.kClient.RegisterStreamConsumer(&kinesis.RegisterStreamConsumerInput{
277-
ConsumerName: aws.String(k.Config.ConsumerName),
278-
StreamARN: aws.String(k.Config.StreamARN),
279-
})
277+
streamConsumer, err := k.kClient.RegisterStreamConsumer(
278+
context.TODO(),
279+
&kinesis.RegisterStreamConsumerInput{
280+
ConsumerName: aws.String(k.Config.ConsumerName),
281+
StreamARN: aws.String(k.Config.StreamARN),
282+
})
280283
if err != nil {
281284
return nil, fmt.Errorf("cannot register stream consumer: %w", err)
282285
}
@@ -289,7 +292,7 @@ func (k *KinesisSource) RegisterConsumer() (*kinesis.RegisterStreamConsumerOutpu
289292
return streamConsumer, nil
290293
}
291294

292-
func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan types.Event, logger *log.Entry, shardID string) {
295+
func (k *KinesisSource) ParseAndPushRecords(records []kinTypes.Record, out chan types.Event, logger *log.Entry, shardID string) {
293296
for _, record := range records {
294297
if k.Config.StreamARN != "" {
295298
if k.metricsLevel != metrics.AcquisitionMetricsLevelNone {
@@ -365,38 +368,42 @@ func (k *KinesisSource) ReadFromSubscription(reader kinesis.SubscribeToShardEven
365368
return nil
366369
}
367370

368-
switch event := event.(type) {
369-
case *kinesis.SubscribeToShardEvent:
370-
k.ParseAndPushRecords(event.Records, out, logger, shardID)
371-
case *kinesis.SubscribeToShardEventStreamUnknownEvent:
372-
logger.Infof("got an unknown event, what to do ?")
371+
switch et := event.(type) {
372+
case *kinTypes.SubscribeToShardEventStreamMemberSubscribeToShardEvent:
373+
k.ParseAndPushRecords(et.Value.Records, out, logger, shardID)
374+
default:
375+
logger.Infof("unhandled SubscribeToShard event: %T", et)
373376
}
374377
}
375378
}
376379
}
377380

378381
func (k *KinesisSource) SubscribeToShards(arn arn.ARN, streamConsumer *kinesis.RegisterStreamConsumerOutput, out chan types.Event) error {
379-
shards, err := k.kClient.ListShards(&kinesis.ListShardsInput{
380-
StreamName: aws.String(arn.Resource[7:]),
381-
})
382+
shards, err := k.kClient.ListShards(
383+
context.TODO(),
384+
&kinesis.ListShardsInput{
385+
StreamName: aws.String(arn.Resource[7:]),
386+
})
382387
if err != nil {
383388
return fmt.Errorf("cannot list shards for enhanced_read: %w", err)
384389
}
385390

386391
for _, shard := range shards.Shards {
387392
shardID := *shard.ShardId
388393

389-
r, err := k.kClient.SubscribeToShard(&kinesis.SubscribeToShardInput{
390-
ShardId: aws.String(shardID),
391-
StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)},
392-
ConsumerARN: streamConsumer.Consumer.ConsumerARN,
393-
})
394+
r, err := k.kClient.SubscribeToShard(
395+
context.TODO(),
396+
&kinesis.SubscribeToShardInput{
397+
ShardId: aws.String(shardID),
398+
StartingPosition: &kinTypes.StartingPosition{Type: kinTypes.ShardIteratorTypeLatest},
399+
ConsumerARN: streamConsumer.Consumer.ConsumerARN,
400+
})
394401
if err != nil {
395402
return fmt.Errorf("cannot subscribe to shard: %w", err)
396403
}
397404

398405
k.shardReaderTomb.Go(func() error {
399-
return k.ReadFromSubscription(r.GetEventStream().Reader, out, shardID, arn.Resource[7:])
406+
return k.ReadFromSubscription(r.GetStream().Reader, out, shardID, arn.Resource[7:])
400407
})
401408
}
402409

@@ -463,11 +470,13 @@ func (k *KinesisSource) ReadFromShard(out chan types.Event, shardID string) erro
463470
logger := k.logger.WithField("shard", shardID)
464471
logger.Debugf("Starting to read shard")
465472

466-
sharIt, err := k.kClient.GetShardIterator(&kinesis.GetShardIteratorInput{
467-
ShardId: aws.String(shardID),
468-
StreamName: &k.Config.StreamName,
469-
ShardIteratorType: aws.String(kinesis.ShardIteratorTypeLatest),
470-
})
473+
sharIt, err := k.kClient.GetShardIterator(
474+
context.TODO(),
475+
&kinesis.GetShardIteratorInput{
476+
ShardId: aws.String(shardID),
477+
StreamName: &k.Config.StreamName,
478+
ShardIteratorType: kinTypes.ShardIteratorTypeLatest,
479+
})
471480
if err != nil {
472481
logger.Errorf("Cannot get shard iterator: %s", err)
473482
return fmt.Errorf("cannot get shard iterator: %w", err)
@@ -480,16 +489,17 @@ func (k *KinesisSource) ReadFromShard(out chan types.Event, shardID string) erro
480489
for {
481490
select {
482491
case <-ticker.C:
483-
records, err := k.kClient.GetRecords(&kinesis.GetRecordsInput{ShardIterator: it})
492+
records, err := k.kClient.GetRecords(context.TODO(), &kinesis.GetRecordsInput{ShardIterator: it})
493+
it = records.NextShardIterator
484494

485-
var throughputErr *kinesis.ProvisionedThroughputExceededException
495+
var throughputErr *kinTypes.ProvisionedThroughputExceededException
486496
if errors.As(err, &throughputErr) {
487497
logger.Warn("Provisioned throughput exceeded")
488498
// TODO: implement exponential backoff
489499
continue
490500
}
491501

492-
var expiredIteratorErr *kinesis.ExpiredIteratorException
502+
var expiredIteratorErr *kinTypes.ExpiredIteratorException
493503
if errors.As(err, &expiredIteratorErr) {
494504
logger.Warn("Expired iterator")
495505
continue
@@ -521,9 +531,11 @@ func (k *KinesisSource) ReadFromStream(out chan types.Event, t *tomb.Tomb) error
521531
k.logger.Info("starting kinesis acquisition from shards")
522532

523533
for {
524-
shards, err := k.kClient.ListShards(&kinesis.ListShardsInput{
525-
StreamName: aws.String(k.Config.StreamName),
526-
})
534+
shards, err := k.kClient.ListShards(
535+
context.TODO(),
536+
&kinesis.ListShardsInput{
537+
StreamName: aws.String(k.Config.StreamName),
538+
})
527539
if err != nil {
528540
return fmt.Errorf("cannot list shards: %w", err)
529541
}

pkg/acquisition/modules/kinesis/kinesis_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
"testing"
1111
"time"
1212

13-
"github.com/aws/aws-sdk-go/aws"
14-
"github.com/aws/aws-sdk-go/aws/session"
15-
"github.com/aws/aws-sdk-go/service/kinesis"
13+
"github.com/aws/aws-sdk-go-v2/aws"
14+
"github.com/aws/aws-sdk-go-v2/config"
15+
"github.com/aws/aws-sdk-go-v2/service/kinesis"
16+
1617
log "github.com/sirupsen/logrus"
1718
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
@@ -53,8 +54,14 @@ func GenSubObject(t *testing.T, i int) []byte {
5354
}
5455

5556
func WriteToStream(t *testing.T, endpoint string, streamName string, count int, shards int, sub bool) {
56-
sess := session.Must(session.NewSession())
57-
kinesisClient := kinesis.New(sess, aws.NewConfig().WithEndpoint(endpoint).WithRegion("us-east-1"))
57+
ctx := t.Context()
58+
59+
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("us-east-1"), config.WithCredentialsProvider(aws.AnonymousCredentials{}))
60+
require.NoError(t, err)
61+
62+
kinesisClient := kinesis.NewFromConfig(cfg, func(o *kinesis.Options) {
63+
o.BaseEndpoint = aws.String(endpoint)
64+
})
5865

5966
for i := range count {
6067
partition := "partition"
@@ -70,7 +77,7 @@ func WriteToStream(t *testing.T, endpoint string, streamName string, count int,
7077
data = []byte(strconv.Itoa(i))
7178
}
7279

73-
_, err := kinesisClient.PutRecord(&kinesis.PutRecordInput{
80+
_, err := kinesisClient.PutRecord(ctx, &kinesis.PutRecordInput{
7481
Data: data,
7582
PartitionKey: aws.String(partition),
7683
StreamName: aws.String(streamName),

pkg/acquisition/modules/s3/s3.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ func (s *S3Source) newS3Client() error {
143143

144144
var clientOpts []func(*s3.Options)
145145
if s.Config.AwsEndpoint != "" {
146-
clientOpts = append(clientOpts, func(o *s3.Options) { o.BaseEndpoint = aws.String(s.Config.AwsEndpoint) })
146+
clientOpts = append(clientOpts, func(o *s3.Options) {
147+
o.BaseEndpoint = aws.String(s.Config.AwsEndpoint)
148+
})
147149
}
148150

149151
s.s3Client = s3.NewFromConfig(cfg, clientOpts...)

0 commit comments

Comments
 (0)