Skip to content

Commit

Permalink
s3mapprovider for Collector: load configuration from config files in …
Browse files Browse the repository at this point in the history
…Amazon S3 (open-telemetry#14317)

* code implementation of s3mapprovider

* collector version and codeowners

* Update .github/ALLOWLIST

Co-authored-by: Pablo Baeyens <[email protected]>

* collector version updated

* collector version updated

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
rice-junhaoyu and mx-psi authored Sep 26, 2022
1 parent e239898 commit 1d88657
Show file tree
Hide file tree
Showing 10 changed files with 822 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ cmd/otelcontribcol/ @open-telemetry/collector-c
cmd/telemetrygen/ @open-telemetry/collector-contrib-approvers @mx-psi @amenasria @codeboten
cmd/tracegen/ @open-telemetry/collector-contrib-approvers @jpkrohling

confmap/provider/s3mapprovider/ @open-telemetry/collector-contrib-approvers @Aneurysm9

exporter/alibabacloudlogserviceexporter/ @open-telemetry/collector-contrib-approvers @shabicheng @kongluoxing @qiansheng91
exporter/awscloudwatchlogsexporter/ @open-telemetry/collector-contrib-approvers @boostchicken
exporter/awsemfexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @shaochengwang @mxiamxia
Expand Down
1 change: 1 addition & 0 deletions confmap/provider/s3mapprovider/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
12 changes: 12 additions & 0 deletions confmap/provider/s3mapprovider/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
## Summary
This package provides a `ConfigMapProvider` implementation for Amazon S3 (`s3mapprovider`) that allows the Collector the ability to load configuration by fetching and reading config objects stored in Amazon S3.
## How it works
- It will be called by `ConfigMapResolver` to load configuration for the Collector.
- By giving a config URI starting with prefix `s3://`, this `s3mapprovider` will be used to download config objects from the given S3 URIs, and then use the downloaded configuration during Collector initialization.

Expected URI format:
- s3://[BUCKET].s3.[REGION].amazonaws.com/[KEY]

Prerequistes:
- Need to setup access keys from IAM console (aws_access_key_id and aws_secret_access_key) with permission to access Amazon S3
- For details, can take a look at https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/
36 changes: 36 additions & 0 deletions confmap/provider/s3mapprovider/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3mapprovider

go 1.18

require (
github.com/aws/aws-sdk-go-v2 v1.11.0
github.com/aws/aws-sdk-go-v2/config v1.10.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.60.1-0.20220923151520-96e9af35c002
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 // indirect
github.com/aws/smithy-go v1.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/knadh/koanf v1.4.3 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
429 changes: 429 additions & 0 deletions confmap/provider/s3mapprovider/go.sum

Large diffs are not rendered by default.

138 changes: 138 additions & 0 deletions confmap/provider/s3mapprovider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3mapprovider // import "github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3mapprovider"

import (
"context"
"fmt"
"net/url"
"regexp"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"go.opentelemetry.io/collector/confmap"
"gopkg.in/yaml.v2"
)

const (
schemeName = "s3"
)

type s3Client interface {
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}

type provider struct {
client s3Client
}

// New returns a new confmap.Provider that reads the configuration from a file.
//
// This Provider supports "s3" scheme, and can be called with a "uri" that follows:
//
// s3-uri : s3://[BUCKET].s3.[REGION].amazonaws.com/[KEY]
//
// One example for s3-uri be like: s3://DOC-EXAMPLE-BUCKET.s3.us-west-2.amazonaws.com/photos/puppy.jpg
//
// Examples:
// `s3://DOC-EXAMPLE-BUCKET.s3.us-west-2.amazonaws.com/photos/puppy.jpg` - (unix, windows)
func New() confmap.Provider {
return &provider{client: nil}
}

func (fmp *provider) Retrieve(ctx context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}

// initialize the s3 client in the first call of Retrieve
if fmp.client == nil {
cfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to load configurations to initialize an AWS SDK client, error: %w", err)
}
fmp.client = s3.NewFromConfig(cfg)
}

// Split the uri and get [BUCKET], [REGION], [KEY]
bucket, region, key, err := s3URISplit(uri)
if err != nil {
return nil, fmt.Errorf("%q uri is not valid s3-url: %w", uri, err)
}

// s3 downloading
resp, err := fmp.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}, func(o *s3.Options) {
o.Region = region
})
if err != nil {
return nil, fmt.Errorf("file in S3 failed to fetch uri %q: %w", uri, err)
}

// read config from response body
dec := yaml.NewDecoder(resp.Body)
defer resp.Body.Close()
var conf map[string]interface{}
err = dec.Decode(&conf)
if err != nil {
return nil, err
}
return confmap.NewRetrieved(conf)
}

func (*provider) Scheme() string {
return schemeName
}

func (*provider) Shutdown(context.Context) error {
return nil
}

// S3URISplit splits the s3 uri and get the [BUCKET], [REGION], [KEY] in it
// INPUT : s3 uri (like s3://[BUCKET].s3.[REGION].amazonaws.com/[KEY])
// OUTPUT :
// - [BUCKET] : The name of a bucket in Amazon S3.
// - [REGION] : Where are servers from, e.g. us-west-2.
// - [KEY] : The key exists in a given bucket, can be used to retrieve a file.
func s3URISplit(uri string) (string, string, string, error) {
// check whether the pattern of s3-uri is correct
matched, err := regexp.MatchString(`s3:\/\/(.*)\.s3\.(.*).amazonaws\.com\/(.*)`, uri)
if !matched || err != nil {
return "", "", "", fmt.Errorf("invalid s3-uri using a wrong pattern")
}
// parse the uri as [scheme:][//[userinfo@]host][/]path[?query][#fragment], then extract components from
u, err := url.Parse(uri)
if err != nil {
return "", "", "", fmt.Errorf("failed to change the s3-uri to url.URL: %w", err)
}
// extract components
key := strings.TrimPrefix(u.Path, "/")
host := u.Host
hostSplitted := strings.Split(host, ".")
if len(hostSplitted) < 5 {
return "", "", "", fmt.Errorf("invalid host in the s3-uri")
}
bucket := hostSplitted[0]
region := hostSplitted[2]
// check empty fields
if bucket == "" || region == "" || key == "" {
return "", "", "", fmt.Errorf("invalid s3-uri with empty fields")
}
return bucket, region, key, nil
}
150 changes: 150 additions & 0 deletions confmap/provider/s3mapprovider/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3mapprovider

import (
"bytes"
"context"
"fmt"
"io"
"os"
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
)

// A s3 client mocking s3mapprovider works in normal cases
type testClient struct{}

// Implement GetObject() for testClient in normal cases
func (client *testClient) GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
// read local config file and return
f, err := os.ReadFile("./testdata/otel-config.yaml")
if err != nil {
return &s3.GetObjectOutput{}, err
}
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(f)), ContentLength: (int64)(len(f))}, nil
}

// Create a provider mocking s3mapprovider works in normal cases
func NewTestProvider() confmap.Provider {
return &provider{client: &testClient{}}
}

// A s3 client mocking s3mapprovider works when there is no corresponding config file according to the given s3-uri
type testNonExistClient struct{}

// Create a provider mocking s3mapprovider works when there is no corresponding config file according to the given s3-uri
func NewTestNonExistProvider() confmap.Provider {
return &provider{client: &testNonExistClient{}}
}

// Implement GetObject() for testClient when there is no corresponding config file according to the given s3-uri
func (client *testNonExistClient) GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
// read local config file and return
f, err := os.ReadFile("./testdata/nonexist-otel-config.yaml")
if err != nil {
return &s3.GetObjectOutput{}, err
}
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(f)), ContentLength: (int64)(len(f))}, nil
}

// A s3 client mocking s3mapprovider works when the returned config file is invalid
type testInvalidClient struct{}

// Create a provider mocking s3mapprovider works when the returned config file is invalid
func NewTestInvalidProvider() confmap.Provider {
return &provider{client: &testInvalidClient{}}
}

// Implement GetObject() for testClient when the returned config file is invalid
func (client *testInvalidClient) GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
// read local config file and return
return &s3.GetObjectOutput{}, fmt.Errorf("the downloaded config file")
}

func TestFunctionalityDownloadFileS3(t *testing.T) {
fp := NewTestProvider()
_, err := fp.Retrieve(context.Background(), "s3://bucket.s3.region.amazonaws.com/key", nil)
assert.NoError(t, err)
assert.NoError(t, fp.Shutdown(context.Background()))
}

func TestFunctionalityS3URISplit(t *testing.T) {
fp := NewTestProvider()
bucket, region, key, err := s3URISplit("s3://bucket.s3.region.amazonaws.com/key")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "region", region)
assert.Equal(t, "key", key)
assert.NoError(t, fp.Shutdown(context.Background()))
}

func TestInvalidS3URISplit(t *testing.T) {
fp := NewTestProvider()
_, err := fp.Retrieve(context.Background(), "s3://bucket.s3.region.amazonaws", nil)
assert.Error(t, err)
_, err = fp.Retrieve(context.Background(), "s3://bucket.s3.region.aws.com/key", nil)
assert.Error(t, err)
require.NoError(t, fp.Shutdown(context.Background()))
}

func TestUnsupportedScheme(t *testing.T) {
fp := NewTestProvider()
_, err := fp.Retrieve(context.Background(), "https://google.com", nil)
assert.Error(t, err)
assert.NoError(t, fp.Shutdown(context.Background()))
}

func TestEmptyBucket(t *testing.T) {
fp := NewTestProvider()
_, err := fp.Retrieve(context.Background(), "s3://.s3.region.amazonaws.com/key", nil)
require.Error(t, err)
require.NoError(t, fp.Shutdown(context.Background()))
}

func TestEmptyKey(t *testing.T) {
fp := NewTestProvider()
_, err := fp.Retrieve(context.Background(), "s3://bucket.s3.region.amazonaws.com/", nil)
require.Error(t, err)
require.NoError(t, fp.Shutdown(context.Background()))
}

func TestNonExistent(t *testing.T) {
fp := NewTestNonExistProvider()
_, err := fp.Retrieve(context.Background(), "s3://non-exist-bucket.s3.region.amazonaws.com/key", nil)
assert.Error(t, err)
_, err = fp.Retrieve(context.Background(), "s3://bucket.s3.region.amazonaws.com/non-exist-key.yaml", nil)
assert.Error(t, err)
_, err = fp.Retrieve(context.Background(), "s3://bucket.s3.non-exist-region.amazonaws.com/key", nil)
assert.Error(t, err)
require.NoError(t, fp.Shutdown(context.Background()))
}

func TestInvalidYAML(t *testing.T) {
fp := NewTestInvalidProvider()
_, err := fp.Retrieve(context.Background(), "s3://bucket.s3.region.amazonaws.com/key", nil)
assert.Error(t, err)
require.NoError(t, fp.Shutdown(context.Background()))
}

func TestScheme(t *testing.T) {
fp := NewTestProvider()
assert.Equal(t, "s3", fp.Scheme())
require.NoError(t, fp.Shutdown(context.Background()))
}
37 changes: 37 additions & 0 deletions confmap/provider/s3mapprovider/testdata/otel-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
extensions:
memory_ballast:
size_mib: 512
zpages:
endpoint: 0.0.0.0:55679

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:
memory_limiter:
# 75% of maximum memory up to 4G
limit_mib: 1536
# 25% of limit up to 2G
spike_limit_mib: 512
check_interval: 5s

exporters:
logging:
loglevel: debug

service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [logging]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [logging]

extensions: [memory_ballast, zpages]
Loading

0 comments on commit 1d88657

Please sign in to comment.