Skip to content

Commit 3659c2b

Browse files
Support proxy_url on otel translation logic (#10454) (#10591)
* Support proxy_url on otel translation logic * address comments (cherry picked from commit 0e53334) # Conflicts: # internal/pkg/otel/translate/output_elasticsearch.go Co-authored-by: Khushi Jain <[email protected]>
1 parent 616fd51 commit 3659c2b

File tree

3 files changed

+282
-0
lines changed

3 files changed

+282
-0
lines changed

internal/pkg/otel/translate/otelconfig.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,15 @@ func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) {
498498
return nil, err
499499
}
500500

501+
// proxy_url on newConfig is of type url.URL. Beatsauth extension expects it to be of string type instead
502+
// this logic here converts url.URL to string type similar to what a user would set on filebeat config
503+
if defaultTransportSettings.Proxy.URL != nil {
504+
err = newConfig.SetString("proxy_url", -1, defaultTransportSettings.Proxy.URL.String())
505+
if err != nil {
506+
return nil, fmt.Errorf("error settingg proxy url:%w ", err)
507+
}
508+
}
509+
501510
var newMap map[string]any
502511
err = newConfig.Unpack(&newMap)
503512
if err != nil {

internal/pkg/otel/translate/otelconfig_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func TestGetOtelConfig(t *testing.T) {
223223
"preset": "balanced",
224224
"queue.mem.events": 3200,
225225
"ssl.enabled": true,
226+
"proxy_url": "https://example.com",
226227
}
227228

228229
for _, v := range extra {
@@ -235,6 +236,7 @@ func TestGetOtelConfig(t *testing.T) {
235236
finalOutput := map[string]any{
236237
"idle_connection_timeout": "3s",
237238
"proxy_disable": false,
239+
"proxy_url": "https://example.com",
238240
"ssl": map[string]interface{}{
239241
"ca_sha256": []interface{}{},
240242
"ca_trusted_fingerprint": "",
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package translate
6+
7+
import (
8+
"encoding/base64"
9+
"errors"
10+
"fmt"
11+
"net/url"
12+
"reflect"
13+
"strings"
14+
"time"
15+
16+
"github.com/go-viper/mapstructure/v2"
17+
18+
"github.com/elastic/beats/v7/libbeat/common"
19+
"github.com/elastic/beats/v7/libbeat/outputs"
20+
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
21+
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
22+
"github.com/elastic/elastic-agent-libs/config"
23+
"github.com/elastic/elastic-agent-libs/logp"
24+
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
25+
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
26+
)
27+
28+
type esToOTelOptions struct {
29+
elasticsearch.ElasticsearchConfig `config:",inline"`
30+
outputs.HostWorkerCfg `config:",inline"`
31+
32+
Index string `config:"index"`
33+
Pipeline string `config:"pipeline"`
34+
Preset string `config:"preset"`
35+
}
36+
37+
var defaultOptions = esToOTelOptions{
38+
ElasticsearchConfig: elasticsearch.DefaultConfig(),
39+
40+
Index: "", // Dynamic routing is disabled if index is set
41+
Pipeline: "",
42+
Preset: "custom", // default is custom if not set
43+
HostWorkerCfg: outputs.HostWorkerCfg{
44+
Workers: 1,
45+
},
46+
}
47+
48+
// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
49+
// Ensure cloudid is handled before calling this method
50+
// Note: This method may override output queue settings defined by user.
51+
func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) {
52+
escfg := defaultOptions
53+
54+
// check for unsupported config
55+
err := checkUnsupportedConfig(output, logger)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
// apply preset here
61+
// It is important to apply preset before unpacking the config, as preset can override output fields
62+
preset, err := output.String("preset", -1)
63+
if err == nil {
64+
// Performance preset is present, apply it and log any fields that
65+
// were overridden
66+
overriddenFields, presetConfig, err := elasticsearch.ApplyPreset(preset, output)
67+
if err != nil {
68+
return nil, err
69+
}
70+
logger.Infof("Applying performance preset '%v': %v",
71+
preset, config.DebugString(presetConfig, false))
72+
logger.Warnf("Performance preset '%v' overrides user setting for field(s): %s",
73+
preset, strings.Join(overriddenFields, ","))
74+
}
75+
76+
unpackedMap := make(map[string]any)
77+
// unpack and validate ES config
78+
if err := output.Unpack(&unpackedMap); err != nil {
79+
return nil, fmt.Errorf("failed unpacking config. %w", err)
80+
}
81+
82+
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
83+
Result: &escfg,
84+
TagName: "config",
85+
SquashTagOption: "inline",
86+
DecodeHook: cfgDecodeHookFunc(),
87+
})
88+
if err != nil {
89+
return nil, fmt.Errorf("failed creating decoder. %w", err)
90+
}
91+
92+
err = decoder.Decode(&unpackedMap)
93+
if err != nil {
94+
return nil, fmt.Errorf("failed decoding config. %w", err)
95+
}
96+
97+
if err := escfg.Validate(); err != nil {
98+
return nil, err
99+
}
100+
101+
// Create url using host name, protocol and path
102+
hosts := []string{}
103+
for _, h := range escfg.Hosts {
104+
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
105+
if err != nil {
106+
return nil, fmt.Errorf("cannot generate ES URL from host %w", err)
107+
}
108+
hosts = append(hosts, esURL)
109+
}
110+
111+
otelYAMLCfg := map[string]any{
112+
"endpoints": hosts, // hosts, protocol, path, port
113+
114+
// max_conns_per_host is a "hard" limit on number of open connections.
115+
// Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream
116+
// where it could spin as many goroutines as it liked.
117+
// Given that batcher implementation can change and it has a history of such changes,
118+
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
119+
"max_conns_per_host": escfg.NumWorkers(),
120+
121+
// Retry
122+
"retry": map[string]any{
123+
"enabled": true,
124+
"initial_interval": escfg.Backoff.Init, // backoff.init
125+
"max_interval": escfg.Backoff.Max, // backoff.max
126+
"max_retries": escfg.MaxRetries, // max_retries
127+
},
128+
129+
"sending_queue": map[string]any{
130+
"batch": map[string]any{
131+
"flush_timeout": "10s",
132+
"max_size": escfg.BulkMaxSize, // bulk_max_size
133+
"min_size": 0, // 0 means immediately trigger a flush
134+
"sizer": "items",
135+
},
136+
"enabled": true,
137+
"queue_size": getQueueSize(logger, output),
138+
"block_on_overflow": true,
139+
"wait_for_result": true,
140+
"num_consumers": escfg.NumWorkers(),
141+
},
142+
143+
"mapping": map[string]any{
144+
"mode": "bodymap",
145+
},
146+
}
147+
148+
// Compression
149+
otelYAMLCfg["compression"] = "none"
150+
if escfg.CompressionLevel > 0 {
151+
otelYAMLCfg["compression"] = "gzip"
152+
otelYAMLCfg["compression_params"] = map[string]any{
153+
"level": escfg.CompressionLevel,
154+
}
155+
}
156+
157+
// Authentication
158+
setIfNotNil(otelYAMLCfg, "user", escfg.Username) // username
159+
setIfNotNil(otelYAMLCfg, "password", escfg.Password) // password
160+
setIfNotNil(otelYAMLCfg, "api_key", base64.StdEncoding.EncodeToString([]byte(escfg.APIKey))) // api_key
161+
162+
setIfNotNil(otelYAMLCfg, "headers", escfg.Headers) // headers
163+
setIfNotNil(otelYAMLCfg, "pipeline", escfg.Pipeline) // pipeline
164+
// Dynamic routing is disabled if output.elasticsearch.index is set
165+
setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index
166+
167+
// idle_connection_timeout, timeout, ssl block,
168+
// proxy_url, proxy_headers, proxy_disable are handled by beatsauthextension https://github.com/elastic/opentelemetry-collector-components/tree/main/extension/beatsauthextension
169+
// caller of this method should take care of integrating the extension
170+
171+
return otelYAMLCfg, nil
172+
}
173+
174+
// log warning for unsupported config
175+
func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
176+
if cfg.HasField("indices") {
177+
return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported)
178+
} else if cfg.HasField("pipelines") {
179+
return fmt.Errorf("pipelines is currently not supported: %w", errors.ErrUnsupported)
180+
} else if cfg.HasField("parameters") {
181+
return fmt.Errorf("parameters is currently not supported: %w", errors.ErrUnsupported)
182+
} else if value, err := cfg.Bool("allow_older_versions", -1); err == nil && !value {
183+
return fmt.Errorf("allow_older_versions:false is currently not supported: %w", errors.ErrUnsupported)
184+
} else if cfg.HasField("loadbalance") {
185+
return fmt.Errorf("loadbalance is currently not supported: %w", errors.ErrUnsupported)
186+
} else if cfg.HasField("non_indexable_policy") {
187+
return fmt.Errorf("non_indexable_policy is currently not supported: %w", errors.ErrUnsupported)
188+
} else if cfg.HasField("escape_html") {
189+
return fmt.Errorf("escape_html is currently not supported: %w", errors.ErrUnsupported)
190+
} else if cfg.HasField("kerberos") {
191+
return fmt.Errorf("kerberos is currently not supported: %w", errors.ErrUnsupported)
192+
}
193+
194+
return nil
195+
}
196+
197+
// Helper function to check if a struct is empty
198+
func isStructEmpty(s any) bool {
199+
return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface())
200+
}
201+
202+
// Helper function to conditionally add fields to the map
203+
func setIfNotNil(m map[string]any, key string, value any) {
204+
if value == nil {
205+
return
206+
}
207+
208+
v := reflect.ValueOf(value)
209+
210+
switch v.Kind() {
211+
case reflect.String:
212+
if v.String() != "" {
213+
m[key] = value
214+
}
215+
case reflect.Map, reflect.Slice:
216+
if v.Len() > 0 {
217+
m[key] = value
218+
}
219+
case reflect.Struct:
220+
if !isStructEmpty(value) {
221+
m[key] = value
222+
}
223+
default:
224+
m[key] = value
225+
}
226+
}
227+
228+
func getQueueSize(logger *logp.Logger, output *config.C) int {
229+
size, err := output.Int("queue.mem.events", -1)
230+
if err != nil {
231+
logger.Debugf("Failed to get queue size: %v", err)
232+
return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr
233+
}
234+
return int(size)
235+
}
236+
237+
func cfgDecodeHookFunc() mapstructure.DecodeHookFunc {
238+
return func(
239+
f reflect.Type,
240+
t reflect.Type,
241+
data any,
242+
) (any, error) {
243+
if f.Kind() != reflect.String {
244+
return data, nil
245+
}
246+
247+
switch {
248+
case t == reflect.TypeOf(time.Duration(5)):
249+
d, err := time.ParseDuration(data.(string))
250+
if err != nil {
251+
return d, fmt.Errorf("failed parsing duration: %w", err)
252+
} else {
253+
return d, nil
254+
}
255+
case t == reflect.TypeOf(tlscommon.TLSVerificationMode(0)):
256+
verificationMode := tlscommon.TLSVerificationMode(0)
257+
if err := verificationMode.Unpack(data); err != nil {
258+
return nil, fmt.Errorf("failed parsing TLS verification mode: %w", err)
259+
}
260+
return verificationMode, nil
261+
case t == reflect.TypeOf(httpcommon.ProxyURI(url.URL{})):
262+
proxyURL := httpcommon.ProxyURI(url.URL{})
263+
if err := proxyURL.Unpack(data.(string)); err != nil {
264+
return nil, fmt.Errorf("failed parsing proxy_url: %w", err)
265+
}
266+
return proxyURL, nil
267+
default:
268+
return data, nil
269+
}
270+
}
271+
}

0 commit comments

Comments
 (0)