diff --git a/.chloggen/finishsize.yaml b/.chloggen/finishsize.yaml new file mode 100644 index 000000000000..57501528d34e --- /dev/null +++ b/.chloggen/finishsize.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate `min_size_items` and `max_size_items` in favor of `min_size` and `max_size`. + +# One or more tracking issues or pull requests related to the change +issues: [12486] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/finishsize_breaking.yaml b/.chloggen/finishsize_breaking.yaml new file mode 100644 index 000000000000..3d072a23cd91 --- /dev/null +++ b/.chloggen/finishsize_breaking.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update MergeSplit function signature to use the new SizeConfig + +# One or more tracking issues or pull requests related to the change +issues: [12486] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/finishsize_deprecation.yaml b/.chloggen/finishsize_deprecation.yaml new file mode 100644 index 000000000000..c9678d287669 --- /dev/null +++ b/.chloggen/finishsize_deprecation.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate MinSizeConfig and MaxSizeItems. + +# One or more tracking issues or pull requests related to the change +issues: [12486] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go index 922a051c0fc1..a3c56e5cf43c 100644 --- a/exporter/exporterbatcher/config.go +++ b/exporter/exporterbatcher/config.go @@ -6,10 +6,11 @@ package exporterbatcher // import "go.opentelemetry.io/collector/exporter/export import ( "errors" "time" + + "go.opentelemetry.io/collector/confmap" ) // Config defines a configuration for batching requests based on a timeout and a minimum number of items. -// MaxSizeItems defines batch splitting functionality if it's more than zero. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Config struct { @@ -19,66 +20,70 @@ type Config struct { // FlushTimeout sets the time after which a batch will be sent regardless of its size. FlushTimeout time.Duration `mapstructure:"flush_timeout"` + // SizeConfig sets the size limits for a batch. SizeConfig `mapstructure:",squash"` - // Deprecated. Ignored if SizeConfig is set. + // Deprecated: [v0.121.0] Ignored if SizeConfig is set. MinSizeConfig `mapstructure:",squash"` - // Deprecated. Ignored if SizeConfig is set. + // Deprecated: [v0.121.0] Ignored if SizeConfig is set. MaxSizeConfig `mapstructure:",squash"` } +// SizeConfig sets the size limits for a batch. type SizeConfig struct { Sizer SizerType `mapstructure:"sizer"` - MinSize int `mapstructure:"mix_size"` + // MinSize defines the configuration for the minimum size of a batch. + MinSize int `mapstructure:"min_size"` + // MaxSize defines the configuration for the maximum size of a batch. MaxSize int `mapstructure:"max_size"` } -// MinSizeConfig defines the configuration for the minimum number of items in a batch. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated: [v0.121.0] use SizeConfig. type MinSizeConfig struct { - // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be - // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. - // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. - MinSizeItems int `mapstructure:"min_size_items"` + // Deprecated: [v0.121.0] use SizeConfig.MinSize. + MinSizeItems *int `mapstructure:"min_size_items"` } -// MaxSizeConfig defines the configuration for the maximum number of items in a batch. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated: [v0.121.0] use SizeConfig. type MaxSizeConfig struct { - // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP. - // If the batch size exceeds this value, it will be broken up into smaller batches if possible. - // Setting this value to zero disables the maximum size limit. - MaxSizeItems int `mapstructure:"max_size_items"` + // Deprecated: [v0.121.0] use SizeConfig.MaxSize. + MaxSizeItems *int `mapstructure:"max_size_items"` } -func (c Config) Validate() error { - if c.MinSizeItems < 0 { - return errors.New("min_size_items must be greater than or equal to zero") +func (c *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(c); err != nil { + return err } - if c.MaxSizeItems < 0 { - return errors.New("max_size_items must be greater than or equal to zero") + + if c.MinSizeItems != nil { + c.SizeConfig.MinSize = *c.MinSizeItems } - if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { - return errors.New("max_size_items must be greater than or equal to min_size_items") + + if c.MaxSizeItems != nil { + c.SizeConfig.MaxSize = *c.MaxSizeItems } + + return nil +} + +func (c *Config) Validate() error { if c.FlushTimeout <= 0 { - return errors.New("timeout must be greater than zero") + return errors.New("`flush_timeout` must be greater than zero") } + return nil } func (c SizeConfig) Validate() error { if c.MinSize < 0 { - return errors.New("min_size must be greater than or equal to zero") + return errors.New("`min_size` must be greater than or equal to zero") } if c.MaxSize < 0 { - return errors.New("max_size must be greater than or equal to zero") + return errors.New("`max_size` must be greater than or equal to zero") } if c.MaxSize != 0 && c.MaxSize < c.MinSize { - return errors.New("max_size must be greater than or equal to mix_size") + return errors.New("`max_size` must be greater than or equal to mix_size") } return nil } @@ -87,8 +92,9 @@ func NewDefaultConfig() Config { return Config{ Enabled: true, FlushTimeout: 200 * time.Millisecond, - MinSizeConfig: MinSizeConfig{ - MinSizeItems: 8192, + SizeConfig: SizeConfig{ + Sizer: SizerTypeItems, + MinSize: 8192, }, } } diff --git a/exporter/exporterbatcher/config_test.go b/exporter/exporterbatcher/config_test.go index 742950c9aad4..cf247367bb70 100644 --- a/exporter/exporterbatcher/config_test.go +++ b/exporter/exporterbatcher/config_test.go @@ -6,31 +6,16 @@ package exporterbatcher import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" - - "go.opentelemetry.io/collector/confmap" ) func TestValidateConfig(t *testing.T) { cfg := NewDefaultConfig() require.NoError(t, cfg.Validate()) - cfg.MinSizeItems = -1 - require.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") - cfg = NewDefaultConfig() cfg.FlushTimeout = 0 - require.EqualError(t, cfg.Validate(), "timeout must be greater than zero") - - cfg.MaxSizeItems = -1 - require.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") - - cfg = NewDefaultConfig() - cfg.MaxSizeItems = 20000 - cfg.MinSizeItems = 20001 - assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") + require.EqualError(t, cfg.Validate(), "`flush_timeout` must be greater than zero") } func TestValidateSizeConfig(t *testing.T) { @@ -39,45 +24,19 @@ func TestValidateSizeConfig(t *testing.T) { MaxSize: -100, MinSize: 100, } - require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to zero") + require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to zero") cfg = SizeConfig{ - Sizer: SizerTypeBytes, + Sizer: SizerTypeItems, MaxSize: 100, MinSize: -100, } - require.EqualError(t, cfg.Validate(), "min_size must be greater than or equal to zero") + require.EqualError(t, cfg.Validate(), "`min_size` must be greater than or equal to zero") cfg = SizeConfig{ - Sizer: SizerTypeBytes, + Sizer: SizerTypeItems, MaxSize: 100, MinSize: 200, } - require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to mix_size") -} - -func TestSizeUnmarshaler(t *testing.T) { - var rawConf map[string]any - cfg := NewDefaultConfig() - - require.NoError(t, yaml.Unmarshal([]byte(`sizer: bytes`), &rawConf)) - require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) - require.NoError(t, cfg.Validate()) - - require.NoError(t, yaml.Unmarshal([]byte(`sizer: "bytes"`), &rawConf)) - require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) - require.NoError(t, cfg.Validate()) - - require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf)) - require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) - require.NoError(t, cfg.Validate()) - - require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf)) - require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) - require.NoError(t, cfg.Validate()) - - require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf)) - require.EqualError(t, - confmap.NewFromStringMap(rawConf).Unmarshal(&cfg), - "decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"") + require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to mix_size") } diff --git a/exporter/exporterbatcher/sizer_type.go b/exporter/exporterbatcher/sizer_type.go index 1b1b05a27693..718a0597ecbe 100644 --- a/exporter/exporterbatcher/sizer_type.go +++ b/exporter/exporterbatcher/sizer_type.go @@ -7,20 +7,21 @@ import ( "fmt" ) -type SizerType string +type SizerType struct { + val string +} const ( - SizerTypeItems SizerType = "items" - SizerTypeBytes SizerType = "bytes" + sizerTypeItems = "items" ) +var SizerTypeItems = SizerType{val: sizerTypeItems} + // UnmarshalText implements TextUnmarshaler interface. func (s *SizerType) UnmarshalText(text []byte) error { switch str := string(text); str { - case string(SizerTypeItems): + case sizerTypeItems: *s = SizerTypeItems - case string(SizerTypeBytes): - *s = SizerTypeBytes default: return fmt.Errorf("invalid sizer: %q", str) } diff --git a/exporter/exporterbatcher/sizer_type_test.go b/exporter/exporterbatcher/sizer_type_test.go new file mode 100644 index 000000000000..f5dc14816c86 --- /dev/null +++ b/exporter/exporterbatcher/sizer_type_test.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "go.opentelemetry.io/collector/confmap" +) + +func TestSizeTypeUnmarshaler(t *testing.T) { + var rawConf map[string]any + cfg := NewDefaultConfig() + + require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf)) + require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) + require.NoError(t, cfg.Validate()) + + require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf)) + require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg)) + require.NoError(t, cfg.Validate()) + + require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf)) + require.ErrorContains(t, + confmap.NewFromStringMap(rawConf).Unmarshal(&cfg), + "decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"") +} diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index e8b8ffc4e43e..cc44730f74b0 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -70,6 +70,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O } } + if be.batcherCfg.MinSizeItems != nil || be.batcherCfg.MaxSizeItems != nil { + set.Logger.Warn("Using of deprecated fields `min_size_items` and `max_size_items`") + } + // Consumer Sender is always initialized. be.firstSender = newSender(func(ctx context.Context, req request.Request) error { return req.Export(ctx) diff --git a/exporter/exporterhelper/internal/batcher/default_batcher.go b/exporter/exporterhelper/internal/batcher/default_batcher.go index 2fd9497d3230..f147b2c099ba 100644 --- a/exporter/exporterhelper/internal/batcher/default_batcher.go +++ b/exporter/exporterhelper/internal/batcher/default_batcher.go @@ -67,7 +67,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done var reqList []request.Request var mergeSplitErr error if qb.currentBatch == nil { - reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil) + reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.SizeConfig, nil) if mergeSplitErr != nil || len(reqList) == 0 { done.OnDone(mergeSplitErr) qb.currentBatchMu.Unlock() @@ -80,9 +80,9 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done } // We have at least one result in the reqList. Last in the list may not have enough data to be flushed. - // Find if it has at least MinSizeItems, and if it does then move that as the current batch. + // Find if it has at least MinSize, and if it does then move that as the current batch. lastReq := reqList[len(reqList)-1] - if lastReq.ItemsCount() < qb.batchCfg.MinSizeItems { + if lastReq.ItemsCount() < qb.batchCfg.MinSize { // Do not flush the last item and add it to the current batch. reqList = reqList[:len(reqList)-1] qb.currentBatch = &batch{ @@ -101,7 +101,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done return } - reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req) + reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.SizeConfig, req) // If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch. if mergeSplitErr != nil || len(reqList) == 0 { done.OnDone(mergeSplitErr) @@ -127,7 +127,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // cannot unlock and re-lock because we are not done processing all the responses. var firstBatch *batch // Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize. - if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { + if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSize { firstBatch = qb.currentBatch qb.currentBatch = nil } @@ -137,7 +137,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // If we still have results to process, then we need to check if the last result has enough data to flush, or we add it to the currentBatch. if len(reqList) > 0 { lastReq := reqList[len(reqList)-1] - if lastReq.ItemsCount() < qb.batchCfg.MinSizeItems { + if lastReq.ItemsCount() < qb.batchCfg.MinSize { // Do not flush the last item and add it to the current batch. reqList = reqList[:len(reqList)-1] qb.currentBatch = &batch{ diff --git a/exporter/exporterhelper/internal/batcher/default_batcher_test.go b/exporter/exporterhelper/internal/batcher/default_batcher_test.go index d028c020a11f..cfdbd409669c 100644 --- a/exporter/exporterhelper/internal/batcher/default_batcher_test.go +++ b/exporter/exporterhelper/internal/batcher/default_batcher_test.go @@ -39,8 +39,9 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = true cfg.FlushTimeout = 0 - cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ - MinSizeItems: 0, + cfg.SizeConfig = exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: 0, } ba, err := NewBatcher(cfg, @@ -89,8 +90,9 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = true cfg.FlushTimeout = 0 - cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ - MinSizeItems: 10, + cfg.SizeConfig = exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: 10, } ba, err := NewBatcher(cfg, @@ -154,8 +156,9 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = true cfg.FlushTimeout = 50 * time.Millisecond - cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ - MinSizeItems: 100, + cfg.SizeConfig = exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: 100, } ba, err := NewBatcher(cfg, @@ -211,11 +214,10 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = true cfg.FlushTimeout = 0 - cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ - MinSizeItems: 100, - } - cfg.MaxSizeConfig = exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 100, + cfg.SizeConfig = exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: 100, + MaxSize: 100, } ba, err := NewBatcher(cfg, @@ -264,7 +266,7 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { func TestDefaultBatcher_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 + batchCfg.MinSize = 10 batchCfg.FlushTimeout = 100 * time.Second ba, err := NewBatcher(batchCfg, @@ -293,8 +295,8 @@ func TestDefaultBatcher_Shutdown(t *testing.T) { func TestDefaultBatcher_MergeError(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 5 - batchCfg.MaxSizeItems = 7 + batchCfg.MinSize = 5 + batchCfg.MaxSize = 7 ba, err := NewBatcher(batchCfg, func(ctx context.Context, req request.Request) error { return req.Export(ctx) }, diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 927af0ad27dc..df48a739bcfd 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -285,7 +285,7 @@ func TestQueueBatcher_Merge(t *testing.T) { name: "split_disabled", batchCfg: func() exporterbatcher.Config { cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 + cfg.MinSize = 10 cfg.FlushTimeout = 100 * time.Millisecond return cfg }(), @@ -294,9 +294,9 @@ func TestQueueBatcher_Merge(t *testing.T) { name: "split_high_limit", batchCfg: func() exporterbatcher.Config { cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 + cfg.MinSize = 10 cfg.FlushTimeout = 100 * time.Millisecond - cfg.MaxSizeItems = 1000 + cfg.MaxSize = 1000 return cfg }(), }, @@ -353,7 +353,7 @@ func TestQueueBatcher_BatchExportError(t *testing.T) { name: "merge_only", batchCfg: func() exporterbatcher.Config { cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 + cfg.MinSize = 10 return cfg }(), }, @@ -361,8 +361,8 @@ func TestQueueBatcher_BatchExportError(t *testing.T) { name: "merge_without_split_triggered", batchCfg: func() exporterbatcher.Config { cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.MaxSizeItems = 200 + cfg.MinSize = 10 + cfg.MaxSize = 200 return cfg }(), }, @@ -370,8 +370,8 @@ func TestQueueBatcher_BatchExportError(t *testing.T) { name: "merge_with_split_triggered", batchCfg: func() exporterbatcher.Config { cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.MaxSizeItems = 20 + cfg.MinSize = 10 + cfg.MaxSize = 20 return cfg }(), expectedRequests: 1, @@ -410,8 +410,8 @@ func TestQueueBatcher_BatchExportError(t *testing.T) { func TestQueueBatcher_MergeOrSplit(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 5 - batchCfg.MaxSizeItems = 10 + batchCfg.MinSize = 5 + batchCfg.MaxSize = 10 batchCfg.FlushTimeout = 100 * time.Millisecond be, err := newQueueBatcherExporter(exporterqueue.NewDefaultConfig(), batchCfg) require.NoError(t, err) @@ -446,7 +446,7 @@ func TestQueueBatcher_MergeOrSplit(t *testing.T) { func TestQueueBatcher_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 + batchCfg.MinSize = 10 be, err := newQueueBatcherExporter(exporterqueue.NewDefaultConfig(), batchCfg) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -466,7 +466,7 @@ func TestQueueBatcher_Shutdown(t *testing.T) { func TestQueueBatcher_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 + bCfg.MinSize = 3 be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -493,7 +493,7 @@ func TestQueueBatcher_BatchBlocking(t *testing.T) { // Validate that the batch is cancelled once the first request in the request is cancelled func TestQueueBatcher_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 + bCfg.MinSize = 2 be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -525,7 +525,7 @@ func TestQueueBatcher_BatchCancelled(t *testing.T) { func TestQueueBatcher_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 + bCfg.MinSize = 2 be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) @@ -564,7 +564,7 @@ func TestQueueBatcherUnstartedShutdown(t *testing.T) { func TestQueueBatcherWithTimeout(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 + bCfg.MinSize = 10 be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) @@ -605,7 +605,7 @@ func TestQueueBatcherWithTimeout(t *testing.T) { func TestQueueBatcherTimerResetNoConflict(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 + bCfg.MinSize = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) @@ -635,7 +635,7 @@ func TestQueueBatcherTimerFlush(t *testing.T) { t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") } bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 + bCfg.MinSize = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := newQueueBatcherExporter(exporterqueue.Config{}, bCfg) require.NoError(t, err) diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go index ab4253a34527..3283f01b6be7 100644 --- a/exporter/exporterhelper/internal/request/request.go +++ b/exporter/exporterhelper/internal/request/request.go @@ -28,7 +28,7 @@ type Request interface { // marked as not mutable. The length of the returned slice MUST not be 0. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. - MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, Request) ([]Request, error) + MergeSplit(context.Context, exporterbatcher.SizeConfig, Request) ([]Request, error) } // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial diff --git a/exporter/exporterhelper/internal/requesttest/fake_request.go b/exporter/exporterhelper/internal/requesttest/fake_request.go index 33d7a648cb08..ae1dd450fa6d 100644 --- a/exporter/exporterhelper/internal/requesttest/fake_request.go +++ b/exporter/exporterhelper/internal/requesttest/fake_request.go @@ -98,12 +98,12 @@ func (r *FakeRequest) ItemsCount() int { return r.Items } -func (r *FakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 request.Request) ([]request.Request, error) { +func (r *FakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 request.Request) ([]request.Request, error) { if r.MergeErr != nil { return nil, r.MergeErr } - maxItems := cfg.MaxSizeItems + maxItems := cfg.MaxSize if maxItems == 0 { if r2 != nil { fr2 := r2.(*FakeRequest) diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 2a3f753e61fb..20a404485f05 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -165,7 +165,7 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } -func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, request.Request) ([]request.Request, error) { +func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.SizeConfig, request.Request) ([]request.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 3618182d466b..0d97239a31b9 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -13,7 +13,7 @@ import ( // MergeSplit splits and/or merges the provided logs request and the current request into one or more requests // conforming with the MaxSizeConfig. -func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { +func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { if r2 != nil { req2, ok := r2.(*logsRequest) if !ok { @@ -23,7 +23,7 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz } // If no limit we can simply merge the new request into the current and return. - if cfg.MaxSizeItems == 0 { + if cfg.MaxSize == 0 { return []Request{req}, nil } return req.split(cfg) @@ -35,10 +35,10 @@ func (req *logsRequest) mergeTo(dst *logsRequest) { req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs()) } -func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { +func (req *logsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { var res []Request - for req.ItemsCount() > cfg.MaxSizeItems { - ld := extractLogs(req.ld, cfg.MaxSizeItems) + for req.ItemsCount() > cfg.MaxSize { + ld := extractLogs(req.ld, cfg.MaxSize) size := ld.LogRecordCount() req.setCachedItemsCount(req.ItemsCount() - size) res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size}) diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index c2267f0a5167..d4ae3a68c45c 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -19,7 +19,7 @@ import ( func TestMergeLogs(t *testing.T) { lr1 := newLogsRequest(testdata.GenerateLogs(2), nil) lr2 := newLogsRequest(testdata.GenerateLogs(3), nil) - res, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2) + res, err := lr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, lr2) require.NoError(t, err) require.Equal(t, 5, res[0].ItemsCount()) } @@ -27,42 +27,42 @@ func TestMergeLogs(t *testing.T) { func TestMergeLogsInvalidInput(t *testing.T) { lr1 := newTracesRequest(testdata.GenerateTraces(2), nil) lr2 := newLogsRequest(testdata.GenerateLogs(3), nil) - _, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2) + _, err := lr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, lr2) require.Error(t, err) } func TestMergeSplitLogs(t *testing.T) { tests := []struct { name string - cfg exporterbatcher.MaxSizeConfig + cfg exporterbatcher.SizeConfig lr1 request.Request lr2 request.Request expected []Request }{ { name: "both_requests_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, lr1: newLogsRequest(plog.NewLogs(), nil), lr2: newLogsRequest(plog.NewLogs(), nil), expected: []Request{newLogsRequest(plog.NewLogs(), nil)}, }, { name: "first_request_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, lr1: newLogsRequest(plog.NewLogs(), nil), lr2: newLogsRequest(testdata.GenerateLogs(5), nil), expected: []Request{newLogsRequest(testdata.GenerateLogs(5), nil)}, }, { name: "first_empty_second_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, lr1: newLogsRequest(plog.NewLogs(), nil), lr2: nil, expected: []Request{newLogsRequest(plog.NewLogs(), nil)}, }, { name: "merge_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, lr1: newLogsRequest(testdata.GenerateLogs(4), nil), lr2: newLogsRequest(testdata.GenerateLogs(6), nil), expected: []Request{newLogsRequest(func() plog.Logs { @@ -73,7 +73,7 @@ func TestMergeSplitLogs(t *testing.T) { }, { name: "split_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 4}, lr1: newLogsRequest(plog.NewLogs(), nil), lr2: newLogsRequest(testdata.GenerateLogs(10), nil), expected: []Request{ @@ -84,7 +84,7 @@ func TestMergeSplitLogs(t *testing.T) { }, { name: "merge_and_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, lr1: newLogsRequest(testdata.GenerateLogs(8), nil), lr2: newLogsRequest(testdata.GenerateLogs(20), nil), expected: []Request{ @@ -99,7 +99,7 @@ func TestMergeSplitLogs(t *testing.T) { }, { name: "scope_logs_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 4}, lr1: newLogsRequest(func() plog.Logs { ld := testdata.GenerateLogs(4) ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log") @@ -132,7 +132,7 @@ func TestMergeSplitLogs(t *testing.T) { func TestMergeSplitLogsInputNotModifiedIfErrorReturned(t *testing.T) { r1 := newLogsRequest(testdata.GenerateLogs(18), nil) r2 := newTracesRequest(testdata.GenerateTraces(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, r2) require.Error(t, err) assert.Equal(t, 18, r1.ItemsCount()) } @@ -140,7 +140,7 @@ func TestMergeSplitLogsInputNotModifiedIfErrorReturned(t *testing.T) { func TestMergeSplitLogsInvalidInput(t *testing.T) { r1 := newTracesRequest(testdata.GenerateTraces(2), nil) r2 := newLogsRequest(testdata.GenerateLogs(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{}, r2) require.Error(t, err) } @@ -155,7 +155,7 @@ func TestExtractLogs(t *testing.T) { func TestMergeSplitManySmallLogs(t *testing.T) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} merged := []Request{newLogsRequest(testdata.GenerateLogs(1), nil)} for j := 0; j < 1000; j++ { lr2 := newLogsRequest(testdata.GenerateLogs(10), nil) @@ -167,7 +167,7 @@ func TestMergeSplitManySmallLogs(t *testing.T) { func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10010} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10010} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(10), nil)} @@ -182,7 +182,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} @@ -197,7 +197,7 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) { // One request splits into many batches. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 390517c69434..8ded9fa36de9 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -13,7 +13,7 @@ import ( // MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests // conforming with the MaxSizeConfig. -func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { +func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { if r2 != nil { req2, ok := r2.(*metricsRequest) if !ok { @@ -23,7 +23,7 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max } // If no limit we can simply merge the new request into the current and return. - if cfg.MaxSizeItems == 0 { + if cfg.MaxSize == 0 { return []Request{req}, nil } return req.split(cfg) @@ -35,10 +35,10 @@ func (req *metricsRequest) mergeTo(dst *metricsRequest) { req.md.ResourceMetrics().MoveAndAppendTo(dst.md.ResourceMetrics()) } -func (req *metricsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { +func (req *metricsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { var res []Request - for req.ItemsCount() > cfg.MaxSizeItems { - md := extractMetrics(req.md, cfg.MaxSizeItems) + for req.ItemsCount() > cfg.MaxSize { + md := extractMetrics(req.md, cfg.MaxSize) size := md.DataPointCount() req.setCachedItemsCount(req.ItemsCount() - size) res = append(res, &metricsRequest{md: md, pusher: req.pusher, cachedItemsCount: size}) diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 5969e0f75c80..385d125d9835 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeMetrics(t *testing.T) { mr1 := newMetricsRequest(testdata.GenerateMetrics(2), nil) mr2 := newMetricsRequest(testdata.GenerateMetrics(3), nil) - res, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2) + res, err := mr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, mr2) require.NoError(t, err) // Every metric has 2 data points. assert.Equal(t, 2*5, res[0].ItemsCount()) @@ -27,42 +27,42 @@ func TestMergeMetrics(t *testing.T) { func TestMergeMetricsInvalidInput(t *testing.T) { mr1 := newTracesRequest(testdata.GenerateTraces(2), nil) mr2 := newMetricsRequest(testdata.GenerateMetrics(3), nil) - _, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2) + _, err := mr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, mr2) require.Error(t, err) } func TestMergeSplitMetrics(t *testing.T) { tests := []struct { name string - cfg exporterbatcher.MaxSizeConfig + cfg exporterbatcher.SizeConfig mr1 Request mr2 Request expected []Request }{ { name: "both_requests_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, mr1: newMetricsRequest(pmetric.NewMetrics(), nil), mr2: newMetricsRequest(pmetric.NewMetrics(), nil), expected: []Request{newMetricsRequest(pmetric.NewMetrics(), nil)}, }, { name: "first_request_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, mr1: newMetricsRequest(pmetric.NewMetrics(), nil), mr2: newMetricsRequest(testdata.GenerateMetrics(5), nil), expected: []Request{newMetricsRequest(testdata.GenerateMetrics(5), nil)}, }, { name: "first_empty_second_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, mr1: newMetricsRequest(pmetric.NewMetrics(), nil), mr2: nil, expected: []Request{newMetricsRequest(pmetric.NewMetrics(), nil)}, }, { name: "merge_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 60}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 60}, mr1: newMetricsRequest(testdata.GenerateMetrics(10), nil), mr2: newMetricsRequest(testdata.GenerateMetrics(14), nil), expected: []Request{newMetricsRequest(func() pmetric.Metrics { @@ -73,7 +73,7 @@ func TestMergeSplitMetrics(t *testing.T) { }, { name: "split_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 14}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 14}, mr1: newMetricsRequest(pmetric.NewMetrics(), nil), mr2: newMetricsRequest(testdata.GenerateMetrics(15), nil), // 15 metrics, 30 data points expected: []Request{ @@ -84,7 +84,7 @@ func TestMergeSplitMetrics(t *testing.T) { }, { name: "split_and_merge", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 28}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 28}, mr1: newMetricsRequest(testdata.GenerateMetrics(7), nil), // 7 metrics, 14 data points mr2: newMetricsRequest(testdata.GenerateMetrics(25), nil), // 25 metrics, 50 data points expected: []Request{ @@ -99,7 +99,7 @@ func TestMergeSplitMetrics(t *testing.T) { }, { name: "scope_metrics_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 8}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 8}, mr1: newMetricsRequest(func() pmetric.Metrics { md := testdata.GenerateMetrics(4) extraScopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() @@ -133,7 +133,7 @@ func TestMergeSplitMetrics(t *testing.T) { func TestMergeSplitMetricsInputNotModifiedIfErrorReturned(t *testing.T) { r1 := newMetricsRequest(testdata.GenerateMetrics(18), nil) // 18 metrics, 36 data points r2 := newLogsRequest(testdata.GenerateLogs(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, r2) require.Error(t, err) assert.Equal(t, 36, r1.ItemsCount()) } @@ -141,7 +141,7 @@ func TestMergeSplitMetricsInputNotModifiedIfErrorReturned(t *testing.T) { func TestMergeSplitMetricsInvalidInput(t *testing.T) { r1 := newTracesRequest(testdata.GenerateTraces(2), nil) r2 := newMetricsRequest(testdata.GenerateMetrics(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, r2) require.Error(t, err) } @@ -163,7 +163,7 @@ func TestExtractMetricsInvalidMetric(t *testing.T) { func TestMergeSplitManySmallMetrics(t *testing.T) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 20000} merged := []Request{newMetricsRequest(testdata.GenerateMetrics(1), nil)} for j := 0; j < 1000; j++ { lr2 := newMetricsRequest(testdata.GenerateMetrics(10), nil) @@ -175,7 +175,7 @@ func TestMergeSplitManySmallMetrics(t *testing.T) { func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20020} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 20020} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(10), nil)} @@ -190,7 +190,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 20000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)} @@ -205,7 +205,7 @@ func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing. func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) { // One request splits into many batches. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 20000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)} diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 91b936ec611e..c7cf43c6b1d5 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -13,7 +13,7 @@ import ( // MergeSplit splits and/or merges the provided traces request and the current request into one or more requests // conforming with the MaxSizeConfig. -func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { +func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { if r2 != nil { req2, ok := r2.(*tracesRequest) if !ok { @@ -23,7 +23,7 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS } // If no limit we can simply merge the new request into the current and return. - if cfg.MaxSizeItems == 0 { + if cfg.MaxSize == 0 { return []Request{req}, nil } return req.split(cfg) @@ -35,10 +35,10 @@ func (req *tracesRequest) mergeTo(dst *tracesRequest) { req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans()) } -func (req *tracesRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { +func (req *tracesRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { var res []Request - for req.ItemsCount() > cfg.MaxSizeItems { - td := extractTraces(req.td, cfg.MaxSizeItems) + for req.ItemsCount() > cfg.MaxSize { + td := extractTraces(req.td, cfg.MaxSize) size := td.SpanCount() req.setCachedItemsCount(req.ItemsCount() - size) res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedItemsCount: size}) diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 7444273260e8..2dc8f3b4f5ff 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeTraces(t *testing.T) { tr1 := newTracesRequest(testdata.GenerateTraces(2), nil) tr2 := newTracesRequest(testdata.GenerateTraces(3), nil) - res, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2) + res, err := tr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, tr2) require.NoError(t, err) assert.Equal(t, 5, res[0].ItemsCount()) } @@ -26,49 +26,49 @@ func TestMergeTraces(t *testing.T) { func TestMergeTracesInvalidInput(t *testing.T) { tr1 := newLogsRequest(testdata.GenerateLogs(2), nil) tr2 := newTracesRequest(testdata.GenerateTraces(3), nil) - _, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2) + _, err := tr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, tr2) require.Error(t, err) } func TestMergeSplitTraces(t *testing.T) { tests := []struct { name string - cfg exporterbatcher.MaxSizeConfig + cfg exporterbatcher.SizeConfig tr1 Request tr2 Request expected []Request }{ { name: "both_requests_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(ptrace.NewTraces(), nil), tr2: newTracesRequest(ptrace.NewTraces(), nil), expected: []Request{newTracesRequest(ptrace.NewTraces(), nil)}, }, { name: "first_request_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(ptrace.NewTraces(), nil), tr2: newTracesRequest(testdata.GenerateTraces(5), nil), expected: []Request{newTracesRequest(testdata.GenerateTraces(5), nil)}, }, { name: "second_request_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(testdata.GenerateTraces(5), nil), tr2: newTracesRequest(ptrace.NewTraces(), nil), expected: []Request{newTracesRequest(testdata.GenerateTraces(5), nil)}, }, { name: "first_empty_second_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(ptrace.NewTraces(), nil), tr2: nil, expected: []Request{newTracesRequest(ptrace.NewTraces(), nil)}, }, { name: "merge_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(testdata.GenerateTraces(5), nil), tr2: newTracesRequest(testdata.GenerateTraces(5), nil), expected: []Request{newTracesRequest(func() ptrace.Traces { @@ -79,7 +79,7 @@ func TestMergeSplitTraces(t *testing.T) { }, { name: "split_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 4}, tr1: newTracesRequest(ptrace.NewTraces(), nil), tr2: newTracesRequest(testdata.GenerateTraces(10), nil), expected: []Request{ @@ -90,7 +90,7 @@ func TestMergeSplitTraces(t *testing.T) { }, { name: "split_and_merge", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(testdata.GenerateTraces(4), nil), tr2: newTracesRequest(testdata.GenerateTraces(20), nil), expected: []Request{ @@ -105,7 +105,7 @@ func TestMergeSplitTraces(t *testing.T) { }, { name: "scope_spans_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, tr1: newTracesRequest(func() ptrace.Traces { td := testdata.GenerateTraces(10) extraScopeTraces := testdata.GenerateTraces(5) @@ -139,7 +139,7 @@ func TestMergeSplitTraces(t *testing.T) { func TestMergeSplitTracesInputNotModifiedIfErrorReturned(t *testing.T) { r1 := newTracesRequest(testdata.GenerateTraces(18), nil) r2 := newLogsRequest(testdata.GenerateLogs(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, r2) require.Error(t, err) assert.Equal(t, 18, r1.ItemsCount()) } @@ -147,7 +147,7 @@ func TestMergeSplitTracesInputNotModifiedIfErrorReturned(t *testing.T) { func TestMergeSplitTracesInvalidInput(t *testing.T) { r1 := newTracesRequest(testdata.GenerateTraces(2), nil) r2 := newMetricsRequest(testdata.GenerateMetrics(3), nil) - _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, r2) require.Error(t, err) } @@ -161,7 +161,7 @@ func TestExtractTraces(t *testing.T) { } func TestMergeSplitManySmallTraces(t *testing.T) { - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} merged := []Request{newTracesRequest(testdata.GenerateTraces(1), nil)} for j := 0; j < 1000; j++ { lr2 := newTracesRequest(testdata.GenerateTraces(10), nil) @@ -173,7 +173,7 @@ func TestMergeSplitManySmallTraces(t *testing.T) { func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10010} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10010} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newTracesRequest(testdata.GenerateTraces(10), nil)} @@ -188,7 +188,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) { func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newTracesRequest(testdata.GenerateTraces(0), nil)} @@ -203,7 +203,7 @@ func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) { // One request splits into many batches. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newTracesRequest(testdata.GenerateTraces(0), nil)} diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index 575d3b5ee1bb..ad7707985337 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -29,14 +29,21 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/confmap v1.26.0 // indirect go.opentelemetry.io/collector/extension v0.120.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.120.0 // indirect go.opentelemetry.io/collector/featuregate v1.26.0 // indirect diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 6efd95d82c36..0dfa288cb232 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -13,7 +13,7 @@ import ( ) // MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. -func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { +func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { if r2 != nil { req2, ok := r2.(*profilesRequest) if !ok { @@ -23,7 +23,7 @@ func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Ma } // If no limit we can simply merge the new request into the current and return. - if cfg.MaxSizeItems == 0 { + if cfg.MaxSize == 0 { return []exporterhelper.Request{req}, nil } return req.split(cfg) @@ -35,10 +35,10 @@ func (req *profilesRequest) mergeTo(dst *profilesRequest) { req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) } -func (req *profilesRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]exporterhelper.Request, error) { +func (req *profilesRequest) split(cfg exporterbatcher.SizeConfig) ([]exporterhelper.Request, error) { var res []exporterhelper.Request - for req.ItemsCount() > cfg.MaxSizeItems { - pd := extractProfiles(req.pd, cfg.MaxSizeItems) + for req.ItemsCount() > cfg.MaxSize { + pd := extractProfiles(req.pd, cfg.MaxSize) size := pd.SampleCount() req.setCachedItemsCount(req.ItemsCount() - size) res = append(res, &profilesRequest{pd: pd, pusher: req.pusher, cachedItemsCount: size}) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index ff2a8e725e60..ff1435c16274 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -20,7 +20,7 @@ import ( func TestMergeProfiles(t *testing.T) { pr1 := newProfilesRequest(testdata.GenerateProfiles(2), nil) pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil) - res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2) + res, err := pr1.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, pr2) require.NoError(t, err) assert.Len(t, res, 1) assert.Equal(t, 5, res[0].ItemsCount()) @@ -28,42 +28,42 @@ func TestMergeProfiles(t *testing.T) { func TestMergeProfilesInvalidInput(t *testing.T) { pr2 := newProfilesRequest(testdata.GenerateProfiles(3), nil) - _, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, &requesttest.FakeRequest{Items: 1}) + _, err := pr2.MergeSplit(context.Background(), exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems}, &requesttest.FakeRequest{Items: 1}) assert.Error(t, err) } func TestMergeSplitProfiles(t *testing.T) { tests := []struct { name string - cfg exporterbatcher.MaxSizeConfig + cfg exporterbatcher.SizeConfig pr1 exporterhelper.Request pr2 exporterhelper.Request expected []exporterhelper.Request }{ { name: "both_requests_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, pr1: newProfilesRequest(pprofile.NewProfiles(), nil), pr2: newProfilesRequest(pprofile.NewProfiles(), nil), expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)}, }, { name: "first_request_empty", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, pr1: newProfilesRequest(pprofile.NewProfiles(), nil), pr2: newProfilesRequest(testdata.GenerateProfiles(5), nil), expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5), nil)}, }, { name: "first_empty_second_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, pr1: newProfilesRequest(pprofile.NewProfiles(), nil), pr2: nil, expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles(), nil)}, }, { name: "merge_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, pr1: newProfilesRequest(testdata.GenerateProfiles(4), nil), pr2: newProfilesRequest(testdata.GenerateProfiles(6), nil), expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { @@ -74,7 +74,7 @@ func TestMergeSplitProfiles(t *testing.T) { }, { name: "split_only", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 4}, pr1: newProfilesRequest(testdata.GenerateProfiles(10), nil), pr2: nil, expected: []exporterhelper.Request{ @@ -85,7 +85,7 @@ func TestMergeSplitProfiles(t *testing.T) { }, { name: "merge_and_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10}, pr1: newProfilesRequest(testdata.GenerateProfiles(8), nil), pr2: newProfilesRequest(testdata.GenerateProfiles(20), nil), expected: []exporterhelper.Request{ @@ -100,7 +100,7 @@ func TestMergeSplitProfiles(t *testing.T) { }, { name: "scope_profiles_split", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 4}, pr1: newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(6) }(), nil), @@ -136,7 +136,7 @@ func TestExtractProfiles(t *testing.T) { func TestMergeSplitManySmallLogs(t *testing.T) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(1), nil)} for j := 0; j < 1000; j++ { lr2 := newProfilesRequest(testdata.GenerateProfiles(10), nil) diff --git a/exporter/exportertest/go.mod b/exporter/exportertest/go.mod index 60187e1abcef..af6b41be8be9 100644 --- a/exporter/exportertest/go.mod +++ b/exporter/exportertest/go.mod @@ -26,13 +26,20 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/confmap v1.26.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 // indirect go.opentelemetry.io/collector/extension v0.120.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.120.0 // indirect diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 099d67958a4d..50706d655afe 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -37,6 +37,7 @@ func TestUnmarshalConfig(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() require.NoError(t, cm.Unmarshal(&cfg)) + require.NoError(t, xconfmap.Validate(&cfg)) assert.Equal(t, &Config{ TimeoutConfig: exporterhelper.TimeoutConfig{ @@ -58,11 +59,10 @@ func TestUnmarshalConfig(t *testing.T) { BatcherConfig: exporterbatcher.Config{ Enabled: true, FlushTimeout: 200 * time.Millisecond, - MinSizeConfig: exporterbatcher.MinSizeConfig{ - MinSizeItems: 1000, - }, - MaxSizeConfig: exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 10000, + SizeConfig: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeItems, + MinSize: 1000, + MaxSize: 10000, }, }, ClientConfig: configgrpc.ClientConfig{ diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index d26631053e14..ed9a520ea679 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -17,8 +17,9 @@ retry_on_failure: batcher: enabled: true flush_timeout: 200ms - min_size_items: 1000 - max_size_items: 10000 + sizer: "items" + min_size: 1000 + max_size: 10000 auth: authenticator: nop headers: