Skip to content

Commit

Permalink
Use new exporterbatcher.SizeConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Feb 27, 2025
1 parent b2d5487 commit c271783
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 51 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter-sizeconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate `batcher::min_size_items` and `batcher::max_size_items` in favor of `batcher::min_size` and `batcher::max_size`.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38243]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 5 additions & 2 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ The Elasticsearch exporter supports the [common `batcher` settings](https://gith

- `batcher`:
- `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`.
- `min_size_items` (default=5000): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush.
- `max_size_items` (default=0): Maximum number of log records / spans / data points in a batched request. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size_items` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections.
- `sizer` (default=items): Unit of `min_size` and `max_size`. Currently supports only "items", in the future will also support "bytes".
- `min_size` (default=5000): Minimum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`.
- `max_size` (default=0): Maximum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections.
- `min_size_items` (DEPRECATED, use `batcher::min_size` instead): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush.
- `max_size_items` (DEPRECATED, use `batcher::max_size` instead): Maximum number of log records / spans / data points in a batched request.
- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer.

By default, the exporter will perform its own buffering and batching, as configured through the
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type bulkIndexerSession interface {
const defaultMaxRetries = 2

func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
if config.Batcher.enabledSet {
return newSyncBulkIndexer(logger, client, config, requireDataStream), nil
}
return newAsyncBulkIndexer(logger, client, config, requireDataStream)
Expand Down
21 changes: 13 additions & 8 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,16 +89,20 @@ type Config struct {
// This is a slightly modified version of exporterbatcher.Config,
// to enable tri-state Enabled: unset, false, true.
type BatcherConfig struct {
// Enabled indicates whether to enqueue batches before sending
// to the exporter. If Enabled is specified (non-nil),
// then the exporter will not perform any buffering itself.
Enabled *bool `mapstructure:"enabled"`
exporterbatcher.Config `mapstructure:",squash"`

// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
// enabledSet tracks whether Enabled has been specified.
// If enabledSet is false, the exporter will perform its
// own buffering.
enabledSet bool `mapstructure:"-"`
}

exporterbatcher.MinSizeConfig `mapstructure:",squash"`
exporterbatcher.MaxSizeConfig `mapstructure:",squash"`
func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error {
if err := conf.Unmarshal(c); err != nil {
return err
}
c.enabledSet = conf.IsSet("enabled")
return nil
}

type TelemetrySettings struct {
Expand Down
70 changes: 50 additions & 20 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
FlushTimeout: 30 * time.Second,
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
MinSizeItems: &defaultBatcherMinSizeItems,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
MaxSizeItems: nil,
Config: exporterbatcher.Config{
FlushTimeout: 30 * time.Second,
SizeConfig: exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
},
},
Expand Down Expand Up @@ -191,12 +191,12 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
FlushTimeout: 30 * time.Second,
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
MinSizeItems: &defaultBatcherMinSizeItems,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
MaxSizeItems: nil,
Config: exporterbatcher.Config{
FlushTimeout: 30 * time.Second,
SizeConfig: exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
},
},
Expand Down Expand Up @@ -265,12 +265,12 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
FlushTimeout: 30 * time.Second,
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
MinSizeItems: &defaultBatcherMinSizeItems,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
MaxSizeItems: nil,
Config: exporterbatcher.Config{
FlushTimeout: 30 * time.Second,
SizeConfig: exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
},
},
Expand Down Expand Up @@ -305,8 +305,8 @@ func TestConfig(t *testing.T) {
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

enabled := false
cfg.Batcher.Enabled = &enabled
cfg.Batcher.Enabled = false
cfg.Batcher.enabledSet = true
}),
},
{
Expand All @@ -327,6 +327,36 @@ func TestConfig(t *testing.T) {
cfg.Compression = "gzip"
}),
},
{
id: component.NewIDWithName(metadata.Type, "batcher_minmax_size_items"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Batcher.MinSize = 100
cfg.Batcher.MaxSize = 200
cfg.Batcher.MinSizeItems = &cfg.Batcher.MinSize //nolint:staticcheck
cfg.Batcher.MaxSizeItems = &cfg.Batcher.MaxSize //nolint:staticcheck
}),
},
{
id: component.NewIDWithName(metadata.Type, "batcher_minmax_size"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Batcher.MinSize = 100
cfg.Batcher.MaxSize = 200

// TODO uncomment setting min/max_size_items in config.yaml
// and uncomment the below, when the fix to ignore those fields
// is brought into contrib.
// minSizeItems := 300
// maxSizeItems := 400
// cfg.Batcher.MinSizeItems = &minSizeItems
// cfg.Batcher.MaxSizeItems = &maxSizeItems
}),
},
}

for _, tt := range tests {
Expand Down
15 changes: 9 additions & 6 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/auth/authtest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -772,12 +773,11 @@ func TestExporterLogs(t *testing.T) {

cfgs := map[string]func(*Config){
"async": func(cfg *Config) {
batcherEnabled := false
cfg.Batcher.Enabled = &batcherEnabled
cfg.Batcher.enabledSet = false
},
"sync": func(cfg *Config) {
batcherEnabled := true
cfg.Batcher.Enabled = &batcherEnabled
cfg.Batcher.enabledSet = true
cfg.Batcher.Enabled = true
cfg.Batcher.FlushTimeout = 10 * time.Millisecond
},
}
Expand Down Expand Up @@ -1965,9 +1965,12 @@ func TestExporterAuth(t *testing.T) {
func TestExporterBatcher(t *testing.T) {
var requests []*http.Request
testauthID := component.NewID(component.MustNewType("authtest"))
batcherEnabled := false // sync bulk indexer is used without batching
exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) {
cfg.Batcher = BatcherConfig{Enabled: &batcherEnabled}
cfg.Batcher = BatcherConfig{
// sync bulk indexer is used without batching
Config: exporterbatcher.Config{Enabled: false},
enabledSet: true,
}
cfg.Auth = &configauth.Authentication{AuthenticatorID: testauthID}
cfg.Retry.Enabled = false
})
Expand Down
22 changes: 8 additions & 14 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func createDefaultConfig() component.Config {
LogResponseBody: false,
},
Batcher: BatcherConfig{
FlushTimeout: 30 * time.Second,
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
MinSizeItems: &defaultBatcherMinSizeItems,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
MaxSizeItems: nil,
Config: exporterbatcher.Config{
FlushTimeout: 30 * time.Second,
SizeConfig: exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
},
Flush: FlushSettings{
Expand Down Expand Up @@ -203,14 +203,8 @@ func exporterhelperOptions(
exporterhelper.WithShutdown(shutdown),
exporterhelper.WithQueue(cfg.QueueSettings),
}
if cfg.Batcher.Enabled != nil {
batcherConfig := exporterbatcher.Config{
Enabled: *cfg.Batcher.Enabled,
FlushTimeout: cfg.Batcher.FlushTimeout,
MinSizeConfig: cfg.Batcher.MinSizeConfig,
MaxSizeConfig: cfg.Batcher.MaxSizeConfig,
}
opts = append(opts, exporterhelper.WithBatcher(batcherConfig))
if cfg.Batcher.enabledSet {
opts = append(opts, exporterhelper.WithBatcher(cfg.Batcher.Config))

// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
//
Expand Down
12 changes: 12 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,15 @@ elasticsearch/compression_none:
elasticsearch/compression_gzip:
endpoint: https://elastic.example.com:9200
compression: gzip
elasticsearch/batcher_minmax_size_items:
endpoint: https://elastic.example.com:9200
batcher:
min_size_items: 100
max_size_items: 200
elasticsearch/batcher_minmax_size:
endpoint: https://elastic.example.com:9200
batcher:
min_size: 100
max_size: 200
#min_size_items: 300 # min_size is set, does not carry over
#max_size_items: 400 # max_size is set, does not carry over

0 comments on commit c271783

Please sign in to comment.