Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Main (unreleased)

- (_Experimental_) Additions to experimental `database_observability.mysql` component:
- `explain_plans` collector now changes schema before returning the connection to the pool (@cristiangreco)
- metrics now include cloud provider labels when configured (@matthewnolf)

- (_Experimental_) Additions to experimental `database_observability.postgres` component:
- `explain_plans` added the explain plan collector (@rgeyer)
Expand Down
56 changes: 54 additions & 2 deletions internal/component/database_observability/cloud_provider.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,63 @@
package database_observability

import "github.com/aws/aws-sdk-go-v2/aws/arn"
import (
"fmt"
"net"
"regexp"
"strings"

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/go-sql-driver/mysql"
)

var (
rdsRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.([^\.]+)\.(?P<region>[^\.]+)\.rds\.amazonaws\.com`)
azureRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.mysql\.database\.azure\.com`)
)

type CloudProvider struct {
AWS *AWSCloudProviderInfo
AWS *AWSCloudProviderInfo
Azure *AzureCloudProviderInfo
}

type AWSCloudProviderInfo struct {
ARN arn.ARN
}

type AzureCloudProviderInfo struct {
Resource string
}

func PopulateCloudProvider(cloudProvider *CloudProvider, dsn string) (*CloudProvider, error) {
if cloudProvider != nil {
return cloudProvider, nil
}

cloudProvider = &CloudProvider{}

cfg, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}

host, _, err := net.SplitHostPort(cfg.Addr)
if err == nil && host != "" {
if strings.HasSuffix(host, "rds.amazonaws.com") {
matches := rdsRegex.FindStringSubmatch(host)
cloudProvider.AWS = &AWSCloudProviderInfo{
ARN: arn.ARN{
Resource: fmt.Sprintf("db:%s", matches[1]),
Region: matches[3],
AccountID: "unknown",
},
}
} else if strings.HasSuffix(host, "mysql.database.azure.com") {
matches := azureRegex.FindStringSubmatch(host)
cloudProvider.Azure = &AzureCloudProviderInfo{
Resource: matches[1],
}
}
}

return cloudProvider, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package database_observability

import (
"testing"

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPopulateCloudProvider(t *testing.T) {
t.Run("returns pre-populated cloud provider as-is", func(t *testing.T) {
csp := &CloudProvider{
AWS: &AWSCloudProviderInfo{
ARN: arn.ARN{Resource: "some-resource", Region: "some-region", AccountID: "some-account"},
},
}

got, err := PopulateCloudProvider(csp, "some-dsn")
require.NoError(t, err)

assert.Equal(t, csp, got)
})

t.Run("populates AWS RDS details from DSN", func(t *testing.T) {
dsn := "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/schema"
got, err := PopulateCloudProvider(nil, dsn)
require.NoError(t, err)

assert.Equal(t, &CloudProvider{
AWS: &AWSCloudProviderInfo{
ARN: arn.ARN{Resource: "db:products-db", Region: "us-east-1", AccountID: "unknown"},
},
}, got)
})

t.Run("populates Azure details from DSN", func(t *testing.T) {
dsn := "user:pass@tcp(products-db.mysql.database.azure.com:3306)/schema"
got, err := PopulateCloudProvider(nil, dsn)
require.NoError(t, err)

assert.Equal(t, &CloudProvider{
Azure: &AzureCloudProviderInfo{
Resource: "products-db",
},
}, got)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package collector

import (
"context"
"net"
"regexp"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

Expand All @@ -15,11 +12,6 @@ import (

const ConnectionInfoName = "connection_info"

var (
rdsRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.([^\.]+)\.(?P<region>[^\.]+)\.rds\.amazonaws\.com`)
azureRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.mysql\.database\.azure\.com`)
)

type ConnectionInfoArguments struct {
DSN string
Registry *prometheus.Registry
Expand Down Expand Up @@ -69,41 +61,24 @@ func (c *ConnectionInfo) Start(ctx context.Context) error {
engine = "mysql"
)

if c.CloudProvider != nil {
if c.CloudProvider.AWS != nil {
providerName = "aws"
providerAccount = c.CloudProvider.AWS.ARN.AccountID
providerRegion = c.CloudProvider.AWS.ARN.Region

// We only support RDS database for now. Resource types and ARN formats are documented at: https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonrds.html#amazonrds-resources-for-iam-policies
if resource := c.CloudProvider.AWS.ARN.Resource; strings.HasPrefix(resource, "db:") {
dbInstanceIdentifier = strings.TrimPrefix(resource, "db:")
}
}
} else {
cfg, err := mysql.ParseDSN(c.DSN)
if err != nil {
return err
}
csp, err := database_observability.PopulateCloudProvider(c.CloudProvider, c.DSN)
if err != nil {
return err
}

host, _, err := net.SplitHostPort(cfg.Addr)
if err == nil && host != "" {
if strings.HasSuffix(host, "rds.amazonaws.com") {
providerName = "aws"
matches := rdsRegex.FindStringSubmatch(host)
if len(matches) > 3 {
dbInstanceIdentifier = matches[1]
providerRegion = matches[3]
}
} else if strings.HasSuffix(host, "mysql.database.azure.com") {
providerName = "azure"
matches := azureRegex.FindStringSubmatch(host)
if len(matches) > 1 {
dbInstanceIdentifier = matches[1]
}
}
if csp != nil && csp.AWS != nil {
providerName = "aws"
providerAccount = csp.AWS.ARN.AccountID
providerRegion = csp.AWS.ARN.Region
// We only support RDS database for now. Resource types and ARN formats are documented at: https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonrds.html#amazonrds-resources-for-iam-policies
if resource := csp.AWS.ARN.Resource; strings.HasPrefix(resource, "db:") {
dbInstanceIdentifier = strings.TrimPrefix(resource, "db:")
}
} else if csp != nil && csp.Azure != nil {
providerName = "azure"
dbInstanceIdentifier = csp.Azure.Resource
}

c.running.Store(true)

c.InfoMetric.WithLabelValues(providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, c.EngineVersion).Set(1)
Expand Down
32 changes: 16 additions & 16 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,24 @@ func (c *Component) Update(args component.Arguments) error {
}
}

var cloudProviderInfo *database_observability.CloudProvider
if c.args.CloudProvider != nil && c.args.CloudProvider.AWS != nil {
arn, err := arn.Parse(c.args.CloudProvider.AWS.ARN)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to parse AWS cloud provider ARN", "err", err)
}
cloudProviderInfo = &database_observability.CloudProvider{
AWS: &database_observability.AWSCloudProviderInfo{
ARN: arn,
},
}
}

c.args.Targets = append([]discovery.Target{c.baseTarget}, c.args.Targets...)
targets := make([]discovery.Target, 0, len(c.args.Targets)+1)
for _, t := range c.args.Targets {
builder := discovery.NewTargetBuilderFrom(t)
if relabel.ProcessBuilder(builder, database_observability.GetRelabelingRules(serverUUID)...) {
if relabel.ProcessBuilder(builder, database_observability.GetRelabelingRules(serverUUID, cloudProviderInfo, string(c.args.DataSourceName))...) {
targets = append(targets, builder.Target())
}
}
Expand All @@ -348,7 +361,7 @@ func (c *Component) Update(args component.Arguments) error {
}
c.collectors = nil

if err := c.startCollectors(serverUUID, engineVersion, parsedEngineVersion); err != nil {
if err := c.startCollectors(serverUUID, engineVersion, parsedEngineVersion, cloudProviderInfo); err != nil {
c.reportError("failed to start collectors", err)
return nil
}
Expand Down Expand Up @@ -383,7 +396,7 @@ func enableOrDisableCollectors(a Arguments) map[string]bool {
}

// startCollectors attempts to start all of the enabled collectors. If one or more collectors fail to start, their errors are reported
func (c *Component) startCollectors(serverUUID string, engineVersion string, parsedEngineVersion semver.Version) error {
func (c *Component) startCollectors(serverUUID string, engineVersion string, parsedEngineVersion semver.Version, cloudProviderInfo *database_observability.CloudProvider) error {
var startErrors []string

logStartError := func(collectorName, action string, err error) {
Expand All @@ -392,19 +405,6 @@ func (c *Component) startCollectors(serverUUID string, engineVersion string, par
startErrors = append(startErrors, errorString)
}

var cloudProviderInfo *database_observability.CloudProvider
if c.args.CloudProvider != nil && c.args.CloudProvider.AWS != nil {
arn, err := arn.Parse(c.args.CloudProvider.AWS.ARN)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to parse AWS cloud provider ARN", "err", err)
}
cloudProviderInfo = &database_observability.CloudProvider{
AWS: &database_observability.AWSCloudProviderInfo{
ARN: arn,
},
}
}

entryHandler := addLokiLabels(loki.NewEntryHandler(c.handler.Chan(), func() {}), c.instanceKey, serverUUID)

collectors := enableOrDisableCollectors(c.args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (c *Component) Update(args component.Arguments) error {
targets := make([]discovery.Target, 0, len(c.args.Targets)+1)
for _, t := range c.args.Targets {
builder := discovery.NewTargetBuilderFrom(t)
if relabel.ProcessBuilder(builder, database_observability.GetRelabelingRules(systemID)...) {
if relabel.ProcessBuilder(builder, database_observability.GetRelabelingRules(systemID, nil, string(c.args.DataSourceName))...) {
targets = append(targets, builder.Target())
}
}
Expand Down
29 changes: 27 additions & 2 deletions internal/component/database_observability/relabeling.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,35 @@ package database_observability

import "github.com/grafana/alloy/internal/component/common/relabel"

func GetRelabelingRules(serverID string) []*relabel.Config {
func GetRelabelingRules(serverID string, cp *CloudProvider, dsn string) []*relabel.Config {
r := relabel.DefaultRelabelConfig // use default to avoid defining all fields
r.Replacement = serverID
r.TargetLabel = "server_id"
r.Action = relabel.Replace
return []*relabel.Config{&r}

rs := []*relabel.Config{&r}

populatedCloudProvider, err := PopulateCloudProvider(cp, dsn)
if err == nil {
if cp.AWS != nil {
providerName := relabel.DefaultRelabelConfig
providerName.Replacement = "aws"
providerName.TargetLabel = "provider_name"
providerName.Action = relabel.Replace

providerRegion := relabel.DefaultRelabelConfig
providerRegion.Replacement = populatedCloudProvider.AWS.ARN.Region
providerRegion.TargetLabel = "provider_region"
providerRegion.Action = relabel.Replace

providerAccount := relabel.DefaultRelabelConfig
providerAccount.Replacement = populatedCloudProvider.AWS.ARN.AccountID
providerAccount.TargetLabel = "provider_account"
providerAccount.Action = relabel.Replace

rs = append(rs, &providerName, &providerRegion, &providerAccount)
}
}

return rs
}
48 changes: 48 additions & 0 deletions internal/component/database_observability/relabeling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package database_observability

import (
"testing"

"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/grafana/alloy/internal/component/common/relabel"
"github.com/stretchr/testify/require"
)

func Test_GetRelabelingRules(t *testing.T) {
t.Run("return relabeling rules", func(t *testing.T) {
rr := GetRelabelingRules("some-server-id", nil, "some-dsn")

require.Equal(t, 1, len(rr))
require.Equal(t, "some-server-id", rr[0].Replacement)
require.Equal(t, "server_id", rr[0].TargetLabel)
require.Equal(t, relabel.Replace, rr[0].Action)
})

t.Run("return relabeling rules with AWS config", func(t *testing.T) {
rr := GetRelabelingRules("some-server-id", &CloudProvider{
AWS: &AWSCloudProviderInfo{
ARN: arn.ARN{
Region: "some-region",
AccountID: "some-account",
},
},
}, "some-dsn")

require.Equal(t, 4, len(rr))
require.Equal(t, "some-server-id", rr[0].Replacement)
require.Equal(t, "server_id", rr[0].TargetLabel)
require.Equal(t, relabel.Replace, rr[0].Action)

require.Equal(t, "aws", rr[1].Replacement)
require.Equal(t, "provider_name", rr[1].TargetLabel)
require.Equal(t, relabel.Replace, rr[1].Action)

require.Equal(t, "some-region", rr[2].Replacement)
require.Equal(t, "provider_region", rr[2].TargetLabel)
require.Equal(t, relabel.Replace, rr[2].Action)

require.Equal(t, "some-account", rr[3].Replacement)
require.Equal(t, "provider_account", rr[3].TargetLabel)
require.Equal(t, relabel.Replace, rr[3].Action)
})
}
Loading