From ed9f2913d21221f37c3560c9c94a1f0a9a81ffce Mon Sep 17 00:00:00 2001 From: Sho Okada Date: Wed, 22 Apr 2020 18:57:34 +0900 Subject: [PATCH] Add s3SinkEndpoint option Signed-off-by: Sho Okada --- sinks/interfaces.go | 4 +++- sinks/s3sink.go | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sinks/interfaces.go b/sinks/interfaces.go index c57098f7d..2c3f154f9 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -104,6 +104,8 @@ func ManufactureSink() (e EventSinkInterface) { panic("s3 sink specified but s3SinkBucketDir not specified") } + endpoint := viper.GetString("s3SinkEndpoint") + // By default the json is pushed to s3 in not flatenned rfc5424 write format // The option to write to s3 is in the flattened json format which will help in // using the data in redshift with least effort @@ -124,7 +126,7 @@ func ManufactureSink() (e EventSinkInterface) { bufferSize := viper.GetInt("s3SinkBufferSize") overflow := viper.GetBool("s3SinkDiscardMessages") - s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat) + s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat, endpoint) if err != nil { panic(err.Error()) } diff --git a/sinks/s3sink.go b/sinks/s3sink.go index d8c6c36e1..7963e508f 100644 --- a/sinks/s3sink.go +++ b/sinks/s3sink.go @@ -54,12 +54,16 @@ type S3Sink struct { } // NewS3Sink is the factory method constructing a new S3Sink -func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string) (*S3Sink, error) { +func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string, endpoint string) (*S3Sink, error) { awsConfig := &aws.Config{ Region: aws.String(s3SinkRegion), Credentials: credentials.NewStaticCredentials(awsAccessKeyID, s3SinkSecretAccessKey, ""), } + if endpoint != "" { + awsConfig = awsConfig.WithEndpoint(endpoint) + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true) sess, err := session.NewSession(awsConfig) if err != nil {