Skip to content

Commit

Permalink
[HCP Telemetry] Move first TelemetryConfig Fetch into the TelemetryCo…
Browse files Browse the repository at this point in the history
…nfigProvider (#18318)

* Add Enabler interface to turn sink on/off

* Use h for hcpProviderImpl vars, fix PR feeback and fix errors

* Keep nil check in exporter and fix tests

* Clarify comment and fix function name

* Use disable instead of enable

* Fix errors nit in otlp_transform

* Add test for refreshInterval of updateConfig

* Add disabled field in MetricsConfig struct

* Fix PR feedback: improve comment and remove double colons

* Fix deps test which requires a maybe

* Update hcp-sdk-go to v0.61.0

* use disabled flag in telemetry_config.go

* Handle 4XX errors in telemetry_provider

* Fix deps test

* Check 4XX instead

* Run make go-mod-tidy
  • Loading branch information
Achooo authored Aug 30, 2023
1 parent 58e5658 commit 0f48b7a
Show file tree
Hide file tree
Showing 19 changed files with 321 additions and 310 deletions.
19 changes: 8 additions & 11 deletions agent/hcp/client/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
DefaultMetricFilters = regexp.MustCompile(".+")

// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
Expand All @@ -29,6 +29,7 @@ var (
errMissingMetricsConfig = errors.New("missing metrics config")
errInvalidRefreshInterval = errors.New("invalid refresh interval")
errInvalidEndpoint = errors.New("invalid metrics endpoint")
errEmptyEndpoint = errors.New("empty metrics endpoint")
)

// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
Expand All @@ -43,18 +44,14 @@ type MetricsConfig struct {
Labels map[string]string
Filters *regexp.Regexp
Endpoint *url.URL
Disabled bool
}

// RefreshConfig contains configuration for the periodic fetch of configuration from HCP.
type RefreshConfig struct {
RefreshInterval time.Duration
}

// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists.
func (t *TelemetryConfig) MetricsEnabled() bool {
return t.MetricsConfig.Endpoint != nil
}

// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid.
func validateAgentTelemetryConfigPayload(resp *hcptelemetry.AgentTelemetryConfigOK) error {
if resp.Payload == nil {
Expand Down Expand Up @@ -86,7 +83,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
telemetryConfig := resp.Payload.TelemetryConfig
metricsEndpoint, err := convertMetricEndpoint(telemetryConfig.Endpoint, telemetryConfig.Metrics.Endpoint)
if err != nil {
return nil, errInvalidEndpoint
return nil, err
}

metricsFilters := convertMetricFilters(ctx, telemetryConfig.Metrics.IncludeList)
Expand All @@ -97,6 +94,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
Endpoint: metricsEndpoint,
Labels: metricLabels,
Filters: metricsFilters,
Disabled: telemetryConfig.Metrics.Disabled,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: refreshInterval,
Expand All @@ -114,9 +112,8 @@ func convertMetricEndpoint(telemetryEndpoint string, metricsEndpoint string) (*u
endpoint = metricsEndpoint
}

// If endpoint is empty, server not registered with CCM, no error returned.
if endpoint == "" {
return nil, nil
return nil, errEmptyEndpoint
}

// Endpoint from CTW has no metrics path, so it must be added.
Expand Down Expand Up @@ -145,15 +142,15 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.

if len(validFilters) == 0 {
logger.Error("no valid filters")
return defaultMetricFilters
return DefaultMetricFilters
}

// Combine the valid regex strings with OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile final regex", "error", err)
return defaultMetricFilters
return DefaultMetricFilters
}

return composedRegex
Expand Down
47 changes: 5 additions & 42 deletions agent/hcp/client/telemetry_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr error
expectedEnabled bool
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -115,34 +114,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"successNoEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"test", "consul"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: nil,
Labels: map[string]string{"test": "test"},
Filters: validTestFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: false,
},
"successBadFilters": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -163,13 +134,12 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
Filters: DefaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"errorsWithInvalidRefreshInterval": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand Down Expand Up @@ -209,7 +179,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
}
require.NoError(t, err)
require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg)
require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled())
})
}
}
Expand All @@ -231,10 +200,10 @@ func TestConvertMetricEndpoint(t *testing.T) {
override: "https://override.com",
expected: "https://override.com/v1/metrics",
},
"noErrorWithEmptyEndpoints": {
"errorWithEmptyEndpoints": {
endpoint: "",
override: "",
expected: "",
wantErr: errEmptyEndpoint,
},
"errorWithInvalidURL": {
endpoint: " ",
Expand All @@ -252,12 +221,6 @@ func TestConvertMetricEndpoint(t *testing.T) {
return
}

if tc.expected == "" {
require.Nil(t, u)
require.NoError(t, err)
return
}

require.NotNil(t, u)
require.NoError(t, err)
require.Equal(t, tc.expected, u.String())
Expand All @@ -277,13 +240,13 @@ func TestConvertMetricFilters(t *testing.T) {
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
Expand Down
39 changes: 11 additions & 28 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ package hcp
import (
"context"
"fmt"
"time"

"github.com/armon/go-metrics"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/go-hclog"
)

// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
type Deps struct {
Client hcpclient.Client
Client client.Client
Provider scada.Provider
Sink metrics.MetricSink
}
Expand All @@ -27,7 +27,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)

client, err := hcpclient.NewClient(cfg)
hcpClient, err := client.NewClient(cfg)
if err != nil {
return Deps{}, fmt.Errorf("failed to init client: %w", err)
}
Expand All @@ -37,50 +37,33 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}

metricsClient, err := hcpclient.NewMetricsClient(ctx, &cfg)
metricsClient, err := client.NewMetricsClient(ctx, &cfg)
if err != nil {
logger.Error("failed to init metrics client", "error", err)
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
}

sink, err := sink(ctx, client, metricsClient)
sink, err := sink(ctx, metricsClient, NewHCPProvider(ctx, hcpClient))
if err != nil {
// Do not prevent server start if sink init fails, only log error.
logger.Error("failed to init sink", "error", err)
}

return Deps{
Client: client,
Client: hcpClient,
Provider: provider,
Sink: sink,
}, nil
}

// sink initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, errors are returned, only to be logged.
func sink(
ctx context.Context,
hcpClient hcpclient.Client,
metricsClient telemetry.MetricsClient,
cfgProvider *hcpProviderImpl,
) (metrics.MetricSink, error) {
logger := hclog.FromContext(ctx).Named("sink")
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
return nil, fmt.Errorf("failed to fetch telemetry config: %w", err)
}

if !telemetryCfg.MetricsEnabled() {
return nil, nil
}

cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg)
if err != nil {
return nil, fmt.Errorf("failed to init config provider: %w", err)
}
logger := hclog.FromContext(ctx)

reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{
Expand All @@ -90,7 +73,7 @@ func sink(

sink, err := telemetry.NewOTELSink(ctx, sinkOpts)
if err != nil {
return nil, fmt.Errorf("failed create OTELSink: %w", err)
return nil, fmt.Errorf("failed to create OTELSink: %w", err)
}

logger.Debug("initialized HCP metrics sink")
Expand Down
84 changes: 5 additions & 79 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@ package hcp

import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
)

Expand All @@ -24,79 +18,11 @@ type mockMetricsClient struct {

func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")
mt := mockTelemetryConfig(1*time.Second, u, filters)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
expectedSink: true,
},
"noSinkWhenFetchTelemetryConfigFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
wantErr: "failed to fetch telemetry config",
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mt := mockTelemetryConfig(1*time.Second, nil, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
},
"noSinkWhenTelemetryConfigProviderInitFails": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
// Bad refresh interval forces ConfigProvider creation failure.
mt := mockTelemetryConfig(0*time.Second, u, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
wantErr: "failed to init config provider",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
mc := mockMetricsClient{}

test.expect(c)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, err := sink(ctx, mockMetricsClient{}, &hcpProviderImpl{})

s, err := sink(ctx, c, mc)

if test.wantErr != "" {
require.NotNil(t, err)
require.Contains(t, err.Error(), test.wantErr)
require.Nil(t, s)
return
}

if !test.expectedSink {
require.Nil(t, s)
require.Nil(t, err)
return
}

require.NotNil(t, s)
})
}
}

func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig {
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: metricsEndpoint,
Filters: filters,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: refreshInterval,
},
}
require.NotNil(t, s)
require.NoError(t, err)
}
Loading

0 comments on commit 0f48b7a

Please sign in to comment.