Skip to content
This repository has been archived by the owner on Mar 15, 2022. It is now read-only.

Commit

Permalink
Add s3SinkEndpoint option
Browse files Browse the repository at this point in the history
Signed-off-by: Sho Okada <[email protected]>
  • Loading branch information
shokada committed Apr 22, 2020
1 parent eec9229 commit 9dfa29c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
4 changes: 3 additions & 1 deletion sinks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func ManufactureSink() (e EventSinkInterface) {
panic("s3 sink specified but s3SinkBucketDir not specified")
}

endpoint := viper.GetString("s3SinkEndpoint")

// By default the json is pushed to s3 in not flatenned rfc5424 write format
// The option to write to s3 is in the flattened json format which will help in
// using the data in redshift with least effort
Expand All @@ -124,7 +126,7 @@ func ManufactureSink() (e EventSinkInterface) {
bufferSize := viper.GetInt("s3SinkBufferSize")
overflow := viper.GetBool("s3SinkDiscardMessages")

s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat)
s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat, endpoint)
if err != nil {
panic(err.Error())
}
Expand Down
6 changes: 5 additions & 1 deletion sinks/s3sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ type S3Sink struct {
}

// NewS3Sink is the factory method constructing a new S3Sink
func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string) (*S3Sink, error) {
func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string, s3SinkEndpoint string) (*S3Sink, error) {
awsConfig := &aws.Config{
Region: aws.String(s3SinkRegion),
Credentials: credentials.NewStaticCredentials(awsAccessKeyID, s3SinkSecretAccessKey, ""),
}

if s3SinkEndpoint != "" {
awsConfig = awsConfig.WithEndpoint(s3SinkEndpoint)
}

awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true)
sess, err := session.NewSession(awsConfig)
if err != nil {
Expand Down

0 comments on commit 9dfa29c

Please sign in to comment.