@@ -16,12 +16,12 @@ import (
16
16
"time"
17
17
18
18
"github.com/aws/aws-lambda-go/events"
19
- "github.com/aws/aws-sdk-go/aws"
20
- "github.com/aws/aws-sdk-go/aws/session "
21
- "github.com/aws/aws-sdk-go/service/s3"
22
- "github.com/aws/aws-sdk-go/service /s3/s3iface "
23
- "github.com/aws/aws-sdk-go/service/sqs "
24
- "github.com/aws/aws-sdk-go/service/sqs/sqsiface "
19
+ "github.com/aws/aws-sdk-go-v2 /aws"
20
+ "github.com/aws/aws-sdk-go-v2/config "
21
+ "github.com/aws/aws-sdk-go-v2 /service/s3"
22
+ s3Manager "github.com/aws/aws-sdk-go-v2/feature /s3/manager "
23
+ s3types "github.com/aws/aws-sdk-go-v2 /service/s3/types "
24
+ "github.com/aws/aws-sdk-go-v2 /service/sqs"
25
25
yaml "github.com/goccy/go-yaml"
26
26
"github.com/prometheus/client_golang/prometheus"
27
27
log "github.com/sirupsen/logrus"
@@ -32,6 +32,16 @@ import (
32
32
"github.com/crowdsecurity/crowdsec/pkg/types"
33
33
)
34
34
35
+ type S3API interface {
36
+ s3Manager.ListObjectsV2APIClient
37
+ s3Manager.DownloadAPIClient
38
+ }
39
+
40
+ type SQSAPI interface {
41
+ ReceiveMessage (ctx context.Context , params * sqs.ReceiveMessageInput , optFns ... func (* sqs.Options )) (* sqs.ReceiveMessageOutput , error )
42
+ DeleteMessage (ctx context.Context , params * sqs.DeleteMessageInput , optFns ... func (* sqs.Options )) (* sqs.DeleteMessageOutput , error )
43
+ }
44
+
35
45
type S3Configuration struct {
36
46
configuration.DataSourceCommonCfg `yaml:",inline"`
37
47
AwsProfile * string `yaml:"aws_profile"`
@@ -51,12 +61,12 @@ type S3Source struct {
51
61
metricsLevel metrics.AcquisitionMetricsLevel
52
62
Config S3Configuration
53
63
logger * log.Entry
54
- s3Client s3iface. S3API
55
- sqsClient sqsiface. SQSAPI
64
+ s3Client S3API
65
+ sqsClient SQSAPI
56
66
readerChan chan S3Object
57
67
t * tomb.Tomb
58
68
out chan types.Event
59
- ctx aws .Context
69
+ ctx context .Context
60
70
cancel context.CancelFunc
61
71
}
62
72
@@ -109,62 +119,60 @@ const (
109
119
)
110
120
111
121
func (s * S3Source ) newS3Client () error {
112
- options := session.Options {
113
- SharedConfigState : session .SharedConfigEnable ,
114
- }
115
- if s .Config .AwsProfile != nil {
116
- options .Profile = * s .Config .AwsProfile
122
+ if s .s3Client != nil {
123
+ return nil
117
124
}
118
125
119
- sess , err := session . NewSessionWithOptions ( options )
120
- if err != nil {
121
- return fmt . Errorf ( "failed to create aws session: %w" , err )
126
+ var loadOpts [] func ( * config. LoadOptions ) error
127
+ if s . Config . AwsProfile != nil && * s . Config . AwsProfile != "" {
128
+ loadOpts = append ( loadOpts , config . WithSharedConfigProfile ( * s . Config . AwsProfile ) )
122
129
}
123
-
124
- config := aws .NewConfig ()
125
- if s .Config .AwsRegion != "" {
126
- config = config .WithRegion (s .Config .AwsRegion )
130
+ region := s .Config .AwsRegion
131
+ if region == "" {
132
+ region = "us-east-1"
127
133
}
128
- if s .Config .AwsEndpoint != "" {
129
- config = config .WithEndpoint (s .Config .AwsEndpoint )
134
+ loadOpts = append (loadOpts , config .WithRegion (region ))
135
+ loadOpts = append (loadOpts , config .WithCredentialsProvider (aws.AnonymousCredentials {}))
136
+ cfg , err := config .LoadDefaultConfig (s .ctx , loadOpts ... )
137
+ if err != nil {
138
+ return fmt .Errorf ("failed to load aws config: %w" , err )
130
139
}
131
140
132
- s . s3Client = s3 .New ( sess , config )
133
- if s .s3Client == nil {
134
- return errors . New ( "failed to create S3 client" )
141
+ var clientOpts [] func ( * s3.Options )
142
+ if s .Config . AwsEndpoint != "" {
143
+ clientOpts = append ( clientOpts , func ( o * s3. Options ) { o . BaseEndpoint = aws . String ( s . Config . AwsEndpoint ) } )
135
144
}
136
145
146
+ s .s3Client = s3 .NewFromConfig (cfg , clientOpts ... )
137
147
return nil
138
148
}
139
149
140
150
func (s * S3Source ) newSQSClient () error {
141
- var sess * session.Session
142
-
143
- if s .Config .AwsProfile != nil {
144
- sess = session .Must (session .NewSessionWithOptions (session.Options {
145
- SharedConfigState : session .SharedConfigEnable ,
146
- Profile : * s .Config .AwsProfile ,
147
- }))
148
- } else {
149
- sess = session .Must (session .NewSessionWithOptions (session.Options {
150
- SharedConfigState : session .SharedConfigEnable ,
151
- }))
151
+ if s .sqsClient != nil {
152
+ return nil
152
153
}
153
154
154
- if sess == nil {
155
- return errors .New ("failed to create aws session" )
155
+ var loadOpts []func (* config.LoadOptions ) error
156
+ if s .Config .AwsProfile != nil && * s .Config .AwsProfile != "" {
157
+ loadOpts = append (loadOpts , config .WithSharedConfigProfile (* s .Config .AwsProfile ))
156
158
}
157
- config := aws . NewConfig ()
158
- if s . Config . AwsRegion ! = "" {
159
- config = config . WithRegion ( s . Config . AwsRegion )
159
+ region := s . Config . AwsRegion
160
+ if region = = "" {
161
+ region = "us-east-1"
160
162
}
161
- if s .Config .AwsEndpoint != "" {
162
- config = config .WithEndpoint (s .Config .AwsEndpoint )
163
+ loadOpts = append (loadOpts , config .WithRegion (region ))
164
+ loadOpts = append (loadOpts , config .WithCredentialsProvider (aws.AnonymousCredentials {}))
165
+ cfg , err := config .LoadDefaultConfig (s .ctx , loadOpts ... )
166
+ if err != nil {
167
+ return fmt .Errorf ("failed to load aws config: %w" , err )
163
168
}
164
- s .sqsClient = sqs .New (sess , config )
165
- if s .sqsClient == nil {
166
- return errors .New ("failed to create SQS client" )
169
+
170
+ var clientOpts []func (* sqs.Options )
171
+ if s .Config .AwsEndpoint != "" {
172
+ clientOpts = append (clientOpts , func (o * sqs.Options ) { o .BaseEndpoint = aws .String (s .Config .AwsEndpoint ) })
167
173
}
174
+
175
+ s .sqsClient = sqs .NewFromConfig (cfg , clientOpts ... )
168
176
return nil
169
177
}
170
178
@@ -186,13 +194,13 @@ func (s *S3Source) readManager() {
186
194
}
187
195
}
188
196
189
- func (s * S3Source ) getBucketContent () ([]* s3 .Object , error ) {
197
+ func (s * S3Source ) getBucketContent () ([]s3types .Object , error ) {
190
198
logger := s .logger .WithField ("method" , "getBucketContent" )
191
199
logger .Debugf ("Getting bucket content for %s" , s .Config .BucketName )
192
- bucketObjects := make ([]* s3 .Object , 0 )
200
+ bucketObjects := make ([]s3types .Object , 0 )
193
201
var continuationToken * string
194
202
for {
195
- out , err := s .s3Client .ListObjectsV2WithContext (s .ctx , & s3.ListObjectsV2Input {
203
+ out , err := s .s3Client .ListObjectsV2 (s .ctx , & s3.ListObjectsV2Input {
196
204
Bucket : aws .String (s .Config .BucketName ),
197
205
Prefix : aws .String (s .Config .Prefix ),
198
206
ContinuationToken : continuationToken ,
@@ -340,10 +348,10 @@ func (s *S3Source) sqsPoll() error {
340
348
return nil
341
349
default :
342
350
logger .Trace ("Polling SQS queue" )
343
- out , err := s .sqsClient .ReceiveMessageWithContext (s .ctx , & sqs.ReceiveMessageInput {
351
+ out , err := s .sqsClient .ReceiveMessage (s .ctx , & sqs.ReceiveMessageInput {
344
352
QueueUrl : aws .String (s .Config .SQSName ),
345
- MaxNumberOfMessages : aws . Int64 ( 10 ) ,
346
- WaitTimeSeconds : aws . Int64 ( 20 ) , // Probably no need to make it configurable ?
353
+ MaxNumberOfMessages : 10 ,
354
+ WaitTimeSeconds : 20 , // Probably no need to make it configurable ?
347
355
})
348
356
if err != nil {
349
357
logger .Errorf ("Error while polling SQS: %s" , err )
@@ -359,21 +367,23 @@ func (s *S3Source) sqsPoll() error {
359
367
if err != nil {
360
368
logger .Errorf ("Error while parsing SQS message: %s" , err )
361
369
// Always delete the message to avoid infinite loop
362
- _ , err = s .sqsClient .DeleteMessage (& sqs.DeleteMessageInput {
363
- QueueUrl : aws .String (s .Config .SQSName ),
364
- ReceiptHandle : message .ReceiptHandle ,
365
- })
370
+ _ , err = s .sqsClient .DeleteMessage (s .ctx ,
371
+ & sqs.DeleteMessageInput {
372
+ QueueUrl : aws .String (s .Config .SQSName ),
373
+ ReceiptHandle : message .ReceiptHandle ,
374
+ })
366
375
if err != nil {
367
376
logger .Errorf ("Error while deleting SQS message: %s" , err )
368
377
}
369
378
continue
370
379
}
371
380
logger .Debugf ("Received SQS message for object %s/%s" , bucket , key )
372
381
s .readerChan <- S3Object {Key : key , Bucket : bucket }
373
- _ , err = s .sqsClient .DeleteMessage (& sqs.DeleteMessageInput {
374
- QueueUrl : aws .String (s .Config .SQSName ),
375
- ReceiptHandle : message .ReceiptHandle ,
376
- })
382
+ _ , err = s .sqsClient .DeleteMessage (s .ctx ,
383
+ & sqs.DeleteMessageInput {
384
+ QueueUrl : aws .String (s .Config .SQSName ),
385
+ ReceiptHandle : message .ReceiptHandle ,
386
+ })
377
387
if err != nil {
378
388
logger .Errorf ("Error while deleting SQS message: %s" , err )
379
389
}
@@ -393,7 +403,7 @@ func (s *S3Source) readFile(bucket string, key string) error {
393
403
"key" : key ,
394
404
})
395
405
396
- output , err := s .s3Client .GetObjectWithContext (s .ctx , & s3.GetObjectInput {
406
+ output , err := s .s3Client .GetObject (s .ctx , & s3.GetObjectInput {
397
407
Bucket : aws .String (bucket ),
398
408
Key : aws .String (key ),
399
409
})
0 commit comments