Skip to content

Commit

Permalink
Start using the new SizeConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 25, 2025
1 parent 5812712 commit b000208
Show file tree
Hide file tree
Showing 26 changed files with 305 additions and 212 deletions.
25 changes: 25 additions & 0 deletions .chloggen/finishsize.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate `min_size_items` and `max_size_items` in favor of `min_size` and `max_size`.

# One or more tracking issues or pull requests related to the change
issues: [12486]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 25 additions & 0 deletions .chloggen/finishsize_breaking.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update MergeSplit function signature to use the new SizeConfig

# One or more tracking issues or pull requests related to the change
issues: [12486]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/finishsize_deprecation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate MinSizeConfig and MaxSizeItems.

# One or more tracking issues or pull requests related to the change
issues: [12486]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
68 changes: 37 additions & 31 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ package exporterbatcher // import "go.opentelemetry.io/collector/exporter/export
import (
"errors"
"time"

"go.opentelemetry.io/collector/confmap"
)

// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
// MaxSizeItems defines batch splitting functionality if it's more than zero.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Config struct {
Expand All @@ -19,66 +20,70 @@ type Config struct {
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`

// SizeConfig sets the size limits for a batch.
SizeConfig `mapstructure:",squash"`

// Deprecated. Ignored if SizeConfig is set.
// Deprecated: [v0.121.0] Ignored if SizeConfig is set.
MinSizeConfig `mapstructure:",squash"`
// Deprecated. Ignored if SizeConfig is set.
// Deprecated: [v0.121.0] Ignored if SizeConfig is set.
MaxSizeConfig `mapstructure:",squash"`
}

// SizeConfig sets the size limits for a batch.
type SizeConfig struct {
Sizer SizerType `mapstructure:"sizer"`

MinSize int `mapstructure:"mix_size"`
// MinSize defines the configuration for the minimum size of a batch.
MinSize int `mapstructure:"min_size"`
// MaxSize defines the configuration for the maximum size of a batch.
MaxSize int `mapstructure:"max_size"`
}

// MinSizeConfig defines the configuration for the minimum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
// Deprecated: [v0.121.0] use SizeConfig.
type MinSizeConfig struct {
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
// Deprecated: [v0.121.0] use SizeConfig.MinSize.
MinSizeItems *int `mapstructure:"min_size_items"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
// Deprecated: [v0.121.0] use SizeConfig.
type MaxSizeConfig struct {
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
// Deprecated: [v0.121.0] use SizeConfig.MaxSize.
MaxSizeItems *int `mapstructure:"max_size_items"`
}

func (c Config) Validate() error {
if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
func (c *Config) Unmarshal(conf *confmap.Conf) error {
if err := conf.Unmarshal(c); err != nil {
return err
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")

if c.MinSizeItems != nil {
c.SizeConfig.MinSize = *c.MinSizeItems

Check warning on line 60 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L60

Added line #L60 was not covered by tests
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")

if c.MaxSizeItems != nil {
c.SizeConfig.MaxSize = *c.MaxSizeItems

Check warning on line 64 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L64

Added line #L64 was not covered by tests
}

return nil
}

func (c *Config) Validate() error {
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
return errors.New("`flush_timeout` must be greater than zero")
}

return nil
}

func (c SizeConfig) Validate() error {
if c.MinSize < 0 {
return errors.New("min_size must be greater than or equal to zero")
return errors.New("`min_size` must be greater than or equal to zero")
}
if c.MaxSize < 0 {
return errors.New("max_size must be greater than or equal to zero")
return errors.New("`max_size` must be greater than or equal to zero")
}
if c.MaxSize != 0 && c.MaxSize < c.MinSize {
return errors.New("max_size must be greater than or equal to mix_size")
return errors.New("`max_size` must be greater than or equal to mix_size")
}
return nil
}
Expand All @@ -87,8 +92,9 @@ func NewDefaultConfig() Config {
return Config{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
MinSizeConfig: MinSizeConfig{
MinSizeItems: 8192,
SizeConfig: SizeConfig{
Sizer: SizerTypeItems,
MinSize: 8192,
},
}
}
53 changes: 6 additions & 47 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,16 @@ package exporterbatcher
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"go.opentelemetry.io/collector/confmap"
)

func TestValidateConfig(t *testing.T) {
cfg := NewDefaultConfig()
require.NoError(t, cfg.Validate())

cfg.MinSizeItems = -1
require.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.FlushTimeout = 0
require.EqualError(t, cfg.Validate(), "timeout must be greater than zero")

cfg.MaxSizeItems = -1
require.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
require.EqualError(t, cfg.Validate(), "`flush_timeout` must be greater than zero")
}

func TestValidateSizeConfig(t *testing.T) {
Expand All @@ -39,45 +24,19 @@ func TestValidateSizeConfig(t *testing.T) {
MaxSize: -100,
MinSize: 100,
}
require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to zero")
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to zero")

cfg = SizeConfig{
Sizer: SizerTypeBytes,
Sizer: SizerTypeItems,
MaxSize: 100,
MinSize: -100,
}
require.EqualError(t, cfg.Validate(), "min_size must be greater than or equal to zero")
require.EqualError(t, cfg.Validate(), "`min_size` must be greater than or equal to zero")

cfg = SizeConfig{
Sizer: SizerTypeBytes,
Sizer: SizerTypeItems,
MaxSize: 100,
MinSize: 200,
}
require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to mix_size")
}

func TestSizeUnmarshaler(t *testing.T) {
var rawConf map[string]any
cfg := NewDefaultConfig()

require.NoError(t, yaml.Unmarshal([]byte(`sizer: bytes`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: "bytes"`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf))
require.EqualError(t,
confmap.NewFromStringMap(rawConf).Unmarshal(&cfg),
"decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"")
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to mix_size")
}
13 changes: 7 additions & 6 deletions exporter/exporterbatcher/sizer_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import (
"fmt"
)

type SizerType string
type SizerType struct {
val string
}

const (
SizerTypeItems SizerType = "items"
SizerTypeBytes SizerType = "bytes"
sizerTypeItems = "items"
)

var SizerTypeItems = SizerType{val: sizerTypeItems}

// UnmarshalText implements TextUnmarshaler interface.
func (s *SizerType) UnmarshalText(text []byte) error {
switch str := string(text); str {
case string(SizerTypeItems):
case sizerTypeItems:
*s = SizerTypeItems
case string(SizerTypeBytes):
*s = SizerTypeBytes
default:
return fmt.Errorf("invalid sizer: %q", str)
}
Expand Down
31 changes: 31 additions & 0 deletions exporter/exporterbatcher/sizer_type_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"go.opentelemetry.io/collector/confmap"
)

func TestSizeTypeUnmarshaler(t *testing.T) {
var rawConf map[string]any
cfg := NewDefaultConfig()

require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf))
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
require.NoError(t, cfg.Validate())

require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf))
require.ErrorContains(t,
confmap.NewFromStringMap(rawConf).Unmarshal(&cfg),
"decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"")
}
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
}
}

if be.batcherCfg.MinSizeItems != nil || be.batcherCfg.MaxSizeItems != nil {
set.Logger.Warn("Using of deprecated fields `min_size_items` and `max_size_items`")
}

Check warning on line 75 in exporter/exporterhelper/internal/base_exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/base_exporter.go#L74-L75

Added lines #L74 - L75 were not covered by tests

// Consumer Sender is always initialized.
be.firstSender = newSender(func(ctx context.Context, req request.Request) error {
return req.Export(ctx)
Expand Down
Loading

0 comments on commit b000208

Please sign in to comment.