From 8bfb7af256f6a106b98f7e0c8e6dea422bc084c7 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 27 Feb 2025 09:46:04 +0800 Subject: [PATCH] Use new exporterbatcher.SizeConfig See https://github.com/open-telemetry/opentelemetry-collector/pull/12486 --- .../elasticsearchexporter-sizeconfig.yaml | 27 ++++++++++ exporter/elasticsearchexporter/README.md | 7 ++- exporter/elasticsearchexporter/config.go | 22 ++++++++ exporter/elasticsearchexporter/config_test.go | 51 +++++++++++++------ exporter/elasticsearchexporter/factory.go | 8 ++- .../testdata/config.yaml | 12 +++++ 6 files changed, 105 insertions(+), 22 deletions(-) create mode 100644 .chloggen/elasticsearchexporter-sizeconfig.yaml diff --git a/.chloggen/elasticsearchexporter-sizeconfig.yaml b/.chloggen/elasticsearchexporter-sizeconfig.yaml new file mode 100644 index 000000000000..a16e136692fb --- /dev/null +++ b/.chloggen/elasticsearchexporter-sizeconfig.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate `batcher::min_size_items` and `batcher::max_size_items` in favor of `batcher::min_size` and `batcher::max_size`. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38243] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a68157b22053..08a37a55e97a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -94,8 +94,11 @@ The Elasticsearch exporter supports the [common `batcher` settings](https://gith - `batcher`: - `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`. - - `min_size_items` (default=5000): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush. - - `max_size_items` (default=0): Maximum number of log records / spans / data points in a batched request. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size_items` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections. + - `sizer` (default=items): Unit of `min_size` and `max_size`. Currently supports only "items", in the future will also support "bytes". + - `min_size` (default=5000): Minimum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. + - `max_size` (default=0): Maximum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections. + - `min_size_items` (DEPRECATED, use `batcher::min_size` instead): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush. + - `max_size_items` (DEPRECATED, use `batcher::max_size` instead): Maximum number of log records / spans / data points in a batched request. - `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer. By default, the exporter will perform its own buffering and batching, as configured through the diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 3d4756d4ef32..5eaa91fe51e8 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" @@ -96,10 +97,31 @@ type BatcherConfig struct { // FlushTimeout sets the time after which a batch will be sent regardless of its size. FlushTimeout time.Duration `mapstructure:"flush_timeout"` + // SizeConfig sets the size limits for a batch. + exporterbatcher.SizeConfig `mapstructure:",squash"` + + // Deprecated: [v0.121.0] Ignored if SizeConfig is set. exporterbatcher.MinSizeConfig `mapstructure:",squash"` + // Deprecated: [v0.121.0] Ignored if SizeConfig is set. exporterbatcher.MaxSizeConfig `mapstructure:",squash"` } +func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(c); err != nil { + return err + } + + if c.MinSizeItems != nil && !conf.IsSet("min_size") { + c.SizeConfig.MinSize = *c.MinSizeItems + } + + if c.MaxSizeItems != nil && !conf.IsSet("max_size") { + c.SizeConfig.MaxSize = *c.MaxSizeItems + } + + return nil +} + type TelemetrySettings struct { LogRequestBody bool `mapstructure:"log_request_body"` LogResponseBody bool `mapstructure:"log_response_body"` diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index b8009f2b95a0..9651c1fe3a7a 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -118,11 +118,9 @@ func TestConfig(t *testing.T) { }, Batcher: BatcherConfig{ FlushTimeout: 30 * time.Second, - MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck - MinSizeItems: &defaultBatcherMinSizeItems, - }, - MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck - MaxSizeItems: nil, + SizeConfig: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: defaultBatcherMinSizeItems, }, }, }, @@ -192,11 +190,9 @@ func TestConfig(t *testing.T) { }, Batcher: BatcherConfig{ FlushTimeout: 30 * time.Second, - MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck - MinSizeItems: &defaultBatcherMinSizeItems, - }, - MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck - MaxSizeItems: nil, + SizeConfig: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: defaultBatcherMinSizeItems, }, }, }, @@ -266,11 +262,9 @@ func TestConfig(t *testing.T) { }, Batcher: BatcherConfig{ FlushTimeout: 30 * time.Second, - MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck - MinSizeItems: &defaultBatcherMinSizeItems, - }, - MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck - MaxSizeItems: nil, + SizeConfig: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: defaultBatcherMinSizeItems, }, }, }, @@ -327,6 +321,33 @@ func TestConfig(t *testing.T) { cfg.Compression = "gzip" }), }, + { + id: component.NewIDWithName(metadata.Type, "batcher_minmax_size_items"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.Batcher.MinSize = 100 + cfg.Batcher.MaxSize = 200 + cfg.Batcher.MinSizeItems = &cfg.Batcher.MinSize + cfg.Batcher.MaxSizeItems = &cfg.Batcher.MaxSize + }), + }, + { + id: component.NewIDWithName(metadata.Type, "batcher_minmax_size"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.Batcher.MinSize = 100 + cfg.Batcher.MaxSize = 200 + + minSizeItems := 300 + maxSizeItems := 400 + cfg.Batcher.MinSizeItems = &minSizeItems + cfg.Batcher.MaxSizeItems = &maxSizeItems + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 8d77b501bc28..9fe545465df4 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -95,11 +95,9 @@ func createDefaultConfig() component.Config { }, Batcher: BatcherConfig{ FlushTimeout: 30 * time.Second, - MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck - MinSizeItems: &defaultBatcherMinSizeItems, - }, - MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck - MaxSizeItems: nil, + SizeConfig: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: defaultBatcherMinSizeItems, }, }, Flush: FlushSettings{ diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 92945c7f1b1c..26740b8fb1a5 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -89,3 +89,15 @@ elasticsearch/compression_none: elasticsearch/compression_gzip: endpoint: https://elastic.example.com:9200 compression: gzip +elasticsearch/batcher_minmax_size_items: + endpoint: https://elastic.example.com:9200 + batcher: + min_size_items: 100 + max_size_items: 200 +elasticsearch/batcher_minmax_size: + endpoint: https://elastic.example.com:9200 + batcher: + min_size: 100 + max_size: 200 + min_size_items: 300 # min_size is set, does not carry over + max_size_items: 400 # max_size is set, does not carry over