Skip to content

Commit

Permalink
Fix webhook error when metrics service address uses env var expansion (
Browse files Browse the repository at this point in the history
…#3531)

* Use a naive approach to parse port before env var expansion

* Refactor how service metrics endpoint parsing works

Now, when there would be an error it gets logged and the default values
are returned. With this refactor the method encapsulates all defaulting
logic that was slightly spread around different places.

* Add more tests to `Service.MetricsEndpoint` and fix them

* Remove unused code

* Make Service.MetricsEndpoint fail when can't parse port

* Update documentation regarding examination of the collector config file

* Fix documentation regarding configured receivers and their ports

* Remove unrelated/confusion doc line

* Handle review feedback
  • Loading branch information
douglascamata authored Dec 13, 2024
1 parent c344c2b commit 0fdf105
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 62 deletions.
18 changes: 18 additions & 0 deletions .chloggen/fix-metrics-service-address-env-var.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: operator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix the admission webhook to when metrics service address host uses env var expansion

# One or more tracking issues related to the change
issues: [3513]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This should allow the metrics service address to have the host portion expanded from an environment variable,
like `$(env:POD_IP)` instead of using `0.0.0.0`, which is the [recommended by the Collector](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/security-best-practices.md#safeguards-against-denial-of-service-attacks).
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ This will create an OpenTelemetry Collector instance named `simplest`, exposing

The `config` node holds the `YAML` that should be passed down as-is to the underlying OpenTelemetry Collector instances. Refer to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) documentation for a reference of the possible entries.

> 🚨 **NOTE:** At this point, the Operator does _not_ validate the contents of the configuration file: if the configuration is invalid, the instance will still be created but the underlying OpenTelemetry Collector might crash.
> 🚨 **NOTE:** At this point, the Operator does _not_ validate the whole contents of the configuration file: if the configuration is invalid, the instance might still be created but the underlying OpenTelemetry Collector might crash.
> 🚨 **Note:** For private GKE clusters, you will need to either add a firewall rule that allows master nodes access to port `9443/tcp` on worker nodes, or change the existing rule that allows access to port `80/tcp`, `443/tcp` and `10254/tcp` to also allow access to port `9443/tcp`. More information can be found in the [Official GCP Documentation](https://cloud.google.com/load-balancing/docs/tcp/setting-up-tcp#config-hc-firewall). See the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#add_firewall_rules) on adding rules and the [Kubernetes issue](https://github.com/kubernetes/kubernetes/issues/79739) for more detail.
The Operator does examine the configuration file to discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations.
The Operator does examine the configuration file for a few purposes:

- To discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations.

- To check if Collector observability is enabled (controlled by `spec.observability.metrics.enableMetrics`). In this case, a Service and ServiceMonitor/PodMonitor are created for the Collector instance. As a consequence, if the metrics service address contains an invalid port or uses environment variable expansion for the port, an error will be returned. A workaround for the environment variable case is to set `enableMetrics` to `false` and manually create the previously mentioned objects with the correct port if you need them.

### Upgrades

As noted above, the OpenTelemetry Collector format is continuing to evolve. However, a best-effort attempt is made to upgrade all managed `OpenTelemetryCollector` resources.
Expand Down
14 changes: 12 additions & 2 deletions apis/v1beta1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
ctx := context.Background()
err := cvw.Default(ctx, &test.otelcol)
if test.expected.Spec.Config.Service.Telemetry == nil {
assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(), "could not apply defaults")
assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(logr.Discard()), "could not apply defaults")
}
assert.NoError(t, err)
assert.Equal(t, test.expected, test.otelcol)
Expand Down Expand Up @@ -588,7 +588,17 @@ func TestOTELColValidatingWebhook(t *testing.T) {
five := int32(5)
maxInt := int32(math.MaxInt32)

cfg := v1beta1.Config{}
cfg := v1beta1.Config{
Service: v1beta1.Service{
Telemetry: &v1beta1.AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "${env:POD_ID}:8888",
},
},
},
},
}
err := yaml.Unmarshal([]byte(cfgYaml), &cfg)
require.NoError(t, err)

Expand Down
69 changes: 46 additions & 23 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"bytes"
"encoding/json"
"fmt"
"net"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Config) getEnvironmentVariablesForComponentKinds(logger logr.Logger, co

// applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
if err := c.Service.ApplyDefaults(); err != nil {
if err := c.Service.ApplyDefaults(logger); err != nil {
return err
}
enabledComponents := c.GetEnabledComponents()
Expand Down Expand Up @@ -427,37 +427,60 @@ type Service struct {
Pipelines map[string]*Pipeline `json:"pipelines" yaml:"pipelines"`
}

// MetricsEndpoint gets the port number and host address for the metrics endpoint from the collector config if it has been set.
func (s *Service) MetricsEndpoint() (string, int32, error) {
defaultAddr := "0.0.0.0"
if s.GetTelemetry() == nil {
// telemetry isn't set, use the default
return defaultAddr, 8888, nil
}
host, port, netErr := net.SplitHostPort(s.GetTelemetry().Metrics.Address)
if netErr != nil && strings.Contains(netErr.Error(), "missing port in address") {
return defaultAddr, 8888, nil
} else if netErr != nil {
return "", 0, netErr
}
i64, err := strconv.ParseInt(port, 10, 32)
const (
defaultServicePort int32 = 8888
defaultServiceHost = "0.0.0.0"
)

// MetricsEndpoint attempts gets the host and port number from the host address without doing any validation regarding the
// address itself.
// It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon
// from the env var, i.e. the address looks like "${env:POD_IP}:4317", "${env:POD_IP}", or "${POD_IP}".
// In cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}", this returns an error. This happens
// because the port is used to generate Service objects and mappings.
func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32, error) {
telemetry := s.GetTelemetry()
if telemetry == nil || telemetry.Metrics.Address == "" {
return defaultServiceHost, defaultServicePort, nil
}

// The regex below matches on strings that end with a colon followed by the environment variable expansion syntax.
// So it should match on strings ending with: ":${env:POD_IP}" or ":${POD_IP}".
const portEnvVarRegex = `:\${[env:]?.*}$`
isPortEnvVar := regexp.MustCompile(portEnvVarRegex).MatchString(telemetry.Metrics.Address)
if isPortEnvVar {
errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s",
telemetry.Metrics.Address)
logger.Info(errMsg)
return "", 0, fmt.Errorf(errMsg)
}

// The regex below matches on strings that end with a colon followed by 1 or more numbers (representing the port).
const explicitPortRegex = `:(\d+$)`
explicitPortMatches := regexp.MustCompile(explicitPortRegex).FindStringSubmatch(telemetry.Metrics.Address)
if len(explicitPortMatches) <= 1 {
return telemetry.Metrics.Address, defaultServicePort, nil
}

port, err := strconv.ParseInt(explicitPortMatches[1], 10, 32)
if err != nil {
errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s",
telemetry.Metrics.Address)
logger.Info(errMsg, "error", err)
return "", 0, err
}

if host == "" {
host = defaultAddr
}

return host, int32(i64), nil
host, _, _ := strings.Cut(telemetry.Metrics.Address, explicitPortMatches[0])
return host, int32(port), nil
}

// ApplyDefaults inserts configuration defaults if it has not been set.
func (s *Service) ApplyDefaults() error {
telemetryAddr, telemetryPort, err := s.MetricsEndpoint()
func (s *Service) ApplyDefaults(logger logr.Logger) error {
telemetryAddr, telemetryPort, err := s.MetricsEndpoint(logger)
if err != nil {
return err
}

tm := &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
Expand Down
174 changes: 144 additions & 30 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,47 +216,157 @@ func TestGetTelemetryFromYAMLIsNil(t *testing.T) {
assert.Nil(t, cfg.Service.GetTelemetry())
}

func TestConfigToMetricsPort(t *testing.T) {

func TestConfigMetricsEndpoint(t *testing.T) {
for _, tt := range []struct {
desc string
expectedAddr string
expectedPort int32
expectedErr bool
config Service
}{
{
"custom port",
"0.0.0.0",
9090,
Service{
desc: "custom port",
expectedAddr: "localhost",
expectedPort: 9090,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "localhost:9090",
},
},
},
},
},
{
desc: "custom port ipv6",
expectedAddr: "[::]",
expectedPort: 9090,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "[::]:9090",
},
},
},
},
},
{
desc: "missing port",
expectedAddr: "localhost",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "localhost",
},
},
},
},
},
{
desc: "missing port ipv6",
expectedAddr: "[::]",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "[::]",
},
},
},
},
},
{
desc: "env var and missing port",
expectedAddr: "${env:POD_IP}",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "${env:POD_IP}",
},
},
},
},
},
{
desc: "env var and missing port ipv6",
expectedAddr: "[${env:POD_IP}]",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "[${env:POD_IP}]",
},
},
},
},
},
{
desc: "env var and with port",
expectedAddr: "${POD_IP}",
expectedPort: 1234,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "${POD_IP}:1234",
},
},
},
},
},
{
desc: "env var and with port ipv6",
expectedAddr: "[${POD_IP}]",
expectedPort: 1234,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "0.0.0.0:9090",
"address": "[${POD_IP}]:1234",
},
},
},
},
},
{
"bad address",
"0.0.0.0",
8888,
Service{
desc: "port is env var",
expectedErr: true,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "0.0.0.0",
"address": "localhost:${env:POD_PORT}",
},
},
},
},
},
{
"missing address",
"0.0.0.0",
8888,
Service{
desc: "port is env var ipv6",
expectedErr: true,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "[::]:${env:POD_PORT}",
},
},
},
},
},
{
desc: "missing address",
expectedAddr: "0.0.0.0",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
Expand All @@ -267,24 +377,23 @@ func TestConfigToMetricsPort(t *testing.T) {
},
},
{
"missing metrics",
"0.0.0.0",
8888,
Service{
desc: "missing metrics",
expectedAddr: "0.0.0.0",
expectedPort: 8888,
config: Service{
Telemetry: &AnyConfig{},
},
},
{
"missing telemetry",
"0.0.0.0",
8888,
Service{},
desc: "missing telemetry",
expectedAddr: "0.0.0.0",
expectedPort: 8888,
},
{
"configured telemetry",
"1.2.3.4",
4567,
Service{
desc: "configured telemetry",
expectedAddr: "1.2.3.4",
expectedPort: 4567,
config: Service{
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
Expand All @@ -296,9 +405,14 @@ func TestConfigToMetricsPort(t *testing.T) {
},
} {
t.Run(tt.desc, func(t *testing.T) {
logger := logr.Discard()
// these are acceptable failures, we return to the collector's default metric port
addr, port, err := tt.config.MetricsEndpoint()
assert.NoError(t, err)
addr, port, err := tt.config.MetricsEndpoint(logger)
if tt.expectedErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.expectedAddr, addr)
assert.Equal(t, tt.expectedPort, port)
})
Expand Down
Loading

0 comments on commit 0fdf105

Please sign in to comment.