Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to aws sdk v2 #83

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions basic/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ type Metric struct {
}

type Collector struct {
config *config.Config
sessions *sessions.Sessions
metrics []Metric
l log.Logger
config *config.Config
awsConfigs *sessions.Configs
metrics []Metric
l log.Logger
}

// New creates a new instance of a Collector.
func New(config *config.Config, sessions *sessions.Sessions) *Collector {
func New(config *config.Config, awsConfigs *sessions.Configs) *Collector {
return &Collector{
config: config,
sessions: sessions,
metrics: Metrics,
l: log.With("component", "basic"),
config: config,
awsConfigs: awsConfigs,
metrics: Metrics,
l: log.With("component", "basic"),
}
}

Expand Down
39 changes: 20 additions & 19 deletions basic/scraper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package basic

import (
"context"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
cloudwatchtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/prometheus/client_golang/prometheus"

"github.com/percona/rds_exporter/config"
Expand All @@ -24,17 +26,17 @@ type Scraper struct {
ch chan<- prometheus.Metric

// internal
svc *cloudwatch.CloudWatch
svc *cloudwatch.Client
constLabels prometheus.Labels
}

func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prometheus.Metric) *Scraper {
// Create CloudWatch client
sess, _ := collector.sessions.GetSession(instance.Region, instance.Instance)
if sess == nil {
awsConfig, _ := collector.awsConfigs.GetSession(instance.Region, instance.Instance)
if awsConfig == nil {
return nil
}
svc := cloudwatch.New(sess)
svc := cloudwatch.NewFromConfig(*awsConfig)

constLabels := prometheus.Labels{
"region": instance.Region,
Expand All @@ -60,12 +62,12 @@ func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prome
}
}

func getLatestDatapoint(datapoints []*cloudwatch.Datapoint) *cloudwatch.Datapoint {
var latest *cloudwatch.Datapoint = nil
func getLatestDatapoint(datapoints []cloudwatchtypes.Datapoint) *cloudwatchtypes.Datapoint {
var latest *cloudwatchtypes.Datapoint = nil

for dp := range datapoints {
if latest == nil || latest.Timestamp.Before(*datapoints[dp].Timestamp) {
latest = datapoints[dp]
latest = &datapoints[dp]
}
}

Expand Down Expand Up @@ -94,26 +96,25 @@ func (s *Scraper) Scrape() {
func (s *Scraper) scrapeMetric(metric Metric) error {
now := time.Now()
end := now.Add(-Delay)
period := int32(Period.Seconds())

params := &cloudwatch.GetMetricStatisticsInput{
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),

Period: aws.Int64(int64(Period.Seconds())),
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),
Period: &period,
MetricName: aws.String(metric.cwName),
Namespace: aws.String("AWS/RDS"),
Dimensions: []*cloudwatch.Dimension{},
Statistics: aws.StringSlice([]string{"Average"}),
Unit: nil,
Dimensions: []cloudwatchtypes.Dimension{},
Statistics: []cloudwatchtypes.Statistic{"Average"},
}

params.Dimensions = append(params.Dimensions, &cloudwatch.Dimension{
params.Dimensions = append(params.Dimensions, cloudwatchtypes.Dimension{
Name: aws.String("DBInstanceIdentifier"),
Value: aws.String(s.instance.Instance),
})

// Call CloudWatch to gather the datapoints
resp, err := s.svc.GetMetricStatistics(params)
resp, err := s.svc.GetMetricStatistics(context.TODO(), params)
if err != nil {
return err
}
Expand All @@ -127,7 +128,7 @@ func (s *Scraper) scrapeMetric(metric Metric) error {
dp := getLatestDatapoint(resp.Datapoints)

// Get the metric.
v := aws.Float64Value(dp.Average)
v := *dp.Average
switch metric.cwName {
case "EngineUptime":
// "Fake EngineUptime -> node_boot_time with time.Now().Unix() - EngineUptime."
Expand Down
6 changes: 3 additions & 3 deletions enhanced/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Collector collects enhanced RDS metrics by utilizing several scrapers.
type Collector struct {
sessions *sessions.Sessions
sessions *sessions.Configs
logger log.Logger

rw sync.RWMutex
Expand All @@ -27,14 +27,14 @@ const (
)

// NewCollector creates new collector and starts scrapers.
func NewCollector(sessions *sessions.Sessions) *Collector {
func NewCollector(sessions *sessions.Configs) *Collector {
c := &Collector{
sessions: sessions,
logger: log.With("component", "enhanced"),
metrics: make(map[string][]prometheus.Metric),
}

for session, instances := range sessions.AllSessions() {
for session, instances := range sessions.AllConfigs() {
s := newScraper(session, instances)

interval := maxInterval
Expand Down
109 changes: 53 additions & 56 deletions enhanced/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"

Expand All @@ -18,14 +17,14 @@ import (
type scraper struct {
instances []sessions.Instance
logStreamNames []string
svc *cloudwatchlogs.CloudWatchLogs
svc *cloudwatchlogs.Client
nextStartTime time.Time
logger log.Logger

testDisallowUnknownFields bool // for tests only
}

func newScraper(session *session.Session, instances []sessions.Instance) *scraper {
func newScraper(config *aws.Config, instances []sessions.Instance) *scraper {
logStreamNames := make([]string, 0, len(instances))
for _, instance := range instances {
logStreamNames = append(logStreamNames, instance.ResourceID)
Expand All @@ -34,7 +33,7 @@ func newScraper(session *session.Session, instances []sessions.Instance) *scrape
return &scraper{
instances: instances,
logStreamNames: logStreamNames,
svc: cloudwatchlogs.New(session),
svc: cloudwatchlogs.NewFromConfig(*config),
nextStartTime: time.Now().Add(-3 * time.Minute).Round(0), // strip monotonic clock reading
logger: log.With("component", "enhanced"),
}
Expand Down Expand Up @@ -78,68 +77,66 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m

input := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String("RDSOSMetrics"),
LogStreamNames: aws.StringSlice(s.logStreamNames[sliceStart:sliceEnd]),
StartTime: aws.Int64(aws.TimeUnixMilli(s.nextStartTime)),
LogStreamNames: s.logStreamNames[sliceStart:sliceEnd],
StartTime: aws.Int64(s.nextStartTime.UnixMilli()),
}

s.logger.With("next_start", s.nextStartTime.UTC()).With("since_last", time.Since(s.nextStartTime)).Debugf("Requesting metrics")

// collect all returned events and metrics/messages
collectAllMetrics := func(output *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range output.Events {
l := s.logger.With("EventId", *event.EventId).With("LogStreamName", *event.LogStreamName)
l = l.With("Timestamp", aws.MillisecondsTimeValue(event.Timestamp).UTC())
l = l.With("IngestionTime", aws.MillisecondsTimeValue(event.IngestionTime).UTC())

var instance *sessions.Instance
for _, i := range s.instances {
if i.ResourceID == *event.LogStreamName {
instance = &i
break
}
}
if instance == nil {
l.Errorf("Failed to find instance.")
continue
}
output, err := s.svc.FilterLogEvents(ctx, input)
if err != nil {
s.logger.Errorf("Failed to filter log events: %s.", err)
}

if instance.DisableEnhancedMetrics {
l.Debugf("Enhanced Metrics are disabled for instance %v.", instance)
continue
}
l = l.With("region", instance.Region).With("instance", instance.Instance)

// l.Debugf("Message:\n%s", *event.Message)
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
if err != nil {
// only for tests
if s.testDisallowUnknownFields {
panic(fmt.Sprintf("New metrics should be added: %s", err))
}

l.Errorf("Failed to parse metrics: %s.", err)
continue
}
// l.Debugf("OS Metrics:\n%#v", osMetrics)
for _, event := range output.Events {
l := s.logger.With("EventId", *event.EventId).With("LogStreamName", *event.LogStreamName)

timestamp := aws.MillisecondsTimeValue(event.Timestamp).UTC()
l.Debugf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp)
l = l.With("Timestamp", time.Unix(*event.Timestamp, 0).UTC())
l = l.With("IngestionTime", time.Unix(*event.IngestionTime, 0).UTC())

if allMetrics[instance.ResourceID] == nil {
allMetrics[instance.ResourceID] = make(map[time.Time][]prometheus.Metric)
var instance *sessions.Instance
for _, i := range s.instances {
if i.ResourceID == *event.LogStreamName {
instance = &i
break
}
allMetrics[instance.ResourceID][timestamp] = osMetrics.makePrometheusMetrics(instance.Region, instance.Labels)
}
if instance == nil {
l.Errorf("Failed to find instance.")
continue
}

if allMessages[instance.ResourceID] == nil {
allMessages[instance.ResourceID] = make(map[time.Time]string)
if instance.DisableEnhancedMetrics {
l.Debugf("Enhanced Metrics are disabled for instance %v.", instance)
continue
}
l = l.With("region", instance.Region).With("instance", instance.Instance)

// l.Debugf("Message:\n%s", *event.Message)
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
if err != nil {
// only for tests
if s.testDisallowUnknownFields {
panic(fmt.Sprintf("New metrics should be added: %s", err))
}
allMessages[instance.ResourceID][timestamp] = *event.Message

l.Errorf("Failed to parse metrics: %s.", err)
continue
}
// l.Debugf("OS Metrics:\n%#v", osMetrics)

return true // continue pagination
}
if err := s.svc.FilterLogEventsPagesWithContext(ctx, input, collectAllMetrics); err != nil {
s.logger.Errorf("Failed to filter log events: %s.", err)
timestamp := time.Unix(*event.Timestamp, 0).UTC()
l.Debugf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp)

if allMetrics[instance.ResourceID] == nil {
allMetrics[instance.ResourceID] = make(map[time.Time][]prometheus.Metric)
}
allMetrics[instance.ResourceID][timestamp] = osMetrics.makePrometheusMetrics(instance.Region, instance.Labels)

if allMessages[instance.ResourceID] == nil {
allMessages[instance.ResourceID] = make(map[time.Time]string)
}
allMessages[instance.ResourceID][timestamp] = *event.Message
}
}
// get better times
Expand Down
12 changes: 6 additions & 6 deletions enhanced/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestScraper(t *testing.T) {
sess, err := sessions.New(cfg.Instances, client.HTTP(), false)
require.NoError(t, err)

for session, instances := range sess.AllSessions() {
session, instances := session, instances
for config, instances := range sess.AllConfigs() {
config, instances := config, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
// test that there are no new metrics
s := newScraper(session, instances)
s := newScraper(config, instances)
s.testDisallowUnknownFields = true
metrics, messages := s.scrape(context.Background())
require.Len(t, metrics, len(instances))
Expand Down Expand Up @@ -159,10 +159,10 @@ func TestScraperDisableEnhancedMetrics(t *testing.T) {
return false
}

for session, instances := range sess.AllSessions() {
session, instances := session, instances
for config, instances := range sess.AllConfigs() {
config, instances := config, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
s := newScraper(session, instances)
s := newScraper(config, instances)
s.testDisallowUnknownFields = true
metrics, _ := s.scrape(context.Background())

Expand Down
17 changes: 15 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ module github.com/percona/rds_exporter
go 1.17

require (
github.com/aws/aws-sdk-go v1.36.30
github.com/aws/aws-sdk-go-v2 v1.17.3
github.com/aws/aws-sdk-go-v2/config v1.18.7
github.com/aws/aws-sdk-go-v2/credentials v1.13.7
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.23.1
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.17.3
github.com/aws/aws-sdk-go-v2/service/rds v1.38.0
github.com/aws/aws-sdk-go-v2/service/sts v1.17.7
github.com/percona/exporter_shared v0.7.3
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.24.0
Expand All @@ -15,6 +21,14 @@ require (
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.28 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.11 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -28,7 +42,6 @@ require (
github.com/prometheus/procfs v0.6.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
Expand Down
Loading