Skip to content

Commit

Permalink
Cleanup Queue factory and initialization
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 28, 2025
1 parent beb9002 commit 5bdc715
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 357 deletions.
25 changes: 25 additions & 0 deletions .chloggen/break-exporterqueue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove exporterqueue.Factory in favor of the NewQueue function, and merge configs for memory and persistent.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/new-queue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change the signature of the exporterhelper.WithQueueRequest to accept Encoding instead of the Factory.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config internal.QueueConfig) Option {
func WithQueue(config QueueConfig) Option {
return internal.WithQueue(config)
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return internal.WithRequestQueue(cfg, queueFactory)
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
return internal.WithRequestQueue(cfg, encoding)
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
Expand Down
65 changes: 20 additions & 45 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type BaseExporter struct {
component.StartFunc
component.ShutdownFunc

Marshaler exporterqueue.Marshaler[request.Request]
Unmarshaler exporterqueue.Unmarshaler[request.Request]
encoding exporterqueue.Encoding[request.Request]

Set exporter.Settings

Expand All @@ -45,18 +44,16 @@ type BaseExporter struct {

ConsumerOptions []consumer.Option

timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueFactory exporterqueue.Factory[request.Request]
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
be := &BaseExporter{
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](),
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
}

for _, op := range options {
Expand Down Expand Up @@ -98,11 +95,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
}

if be.queueCfg.Enabled || be.batcherCfg.Enabled {
qSet := exporterqueue.Settings{
qSet := exporterqueue.Settings[request.Request]{
Signal: signal,
ExporterSettings: set,
Encoding: be.encoding,
}
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,44 +197,30 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
func WithQueue(cfg exporterqueue.Config) Option {
return func(o *BaseExporter) error {
if o.Marshaler == nil || o.Unmarshaler == nil {
if o.encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
Blocking: config.Blocking,
}
o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{
Marshaler: o.Marshaler,
Unmarshaler: o.Unmarshaler,
})
return nil
return WithRequestQueue(cfg, o.encoding)(o)
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option {
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
if o.Marshaler != nil || o.Unmarshaler != nil {
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
if cfg.Enabled && cfg.StorageID != nil && encoding == nil {
return errors.New("`encoding` must not be nil when persistent queue is enabled")
}
o.encoding = encoding
if !cfg.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
}
Expand All @@ -263,20 +247,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
}
}

// WithMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option {
return func(o *BaseExporter) error {
o.Marshaler = marshaler
return nil
}
}

// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithEncoding is used to set the request encoding for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option {
func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
o.Unmarshaler = unmarshaler
o.encoding = encoding
return nil
}
}
21 changes: 11 additions & 10 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
require.Nil(t, bs.encoding)
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
require.Error(t, err)

qCfg := exporterqueue.NewDefaultConfig()
storageId := component.NewID(component.MustNewType("test"))
qCfg.StorageID = &storageId
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
WithRequestQueue(qCfg, nil))
require.Error(t, err)
}

Expand All @@ -93,7 +95,7 @@ func TestBaseExporterLogging(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal,
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
WithBatcher(exporterbatcher.NewDefaultConfig()),
WithRetry(rCfg))
require.NoError(t, err)
Expand All @@ -118,10 +120,9 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
func() Option {
qs := NewDefaultQueueConfig()
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithQueue(qs)
}(),
Expand All @@ -138,7 +139,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
func() Option {
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]())
return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1}))
}(),
func() Option {
bs := exporterbatcher.NewDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
tt.maxWorkers)
require.NoError(t, err)

q := exporterqueue.NewMemoryQueueFactory[request.Request]()(
q := exporterqueue.NewQueue[request.Request](
context.Background(),
exporterqueue.Settings{
exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type obsQueue[T request.Request] struct {
enqueueFailedInst metric.Int64Counter
}

func newObsQueue[T request.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/obs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 7, 9))
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
Expand All @@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 17, 19))
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand All @@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 27, 29))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand Down
44 changes: 4 additions & 40 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,9 @@ var _ = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue. Defaults to 10.
// If batching is enabled, a combined batch cannot contain more requests than the number of consumers.
// So it's recommended to set higher number of consumers if batching is enabled.
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// Blocking controls the queue behavior when full.
// If true it blocks until enough space to add the new request to the queue.
Blocking bool `mapstructure:"blocking"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
}

// NewDefaultQueueConfig returns the default config for QueueConfig.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
func NewDefaultQueueConfig() exporterqueue.Config {
return exporterqueue.Config{
Enabled: true,
NumConsumers: 10,
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
Expand All @@ -56,31 +38,13 @@ func NewDefaultQueueConfig() QueueConfig {
}
}

// Validate checks if the QueueConfig configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}

if qCfg.QueueSize <= 0 {
return errors.New("`queue_size` must be positive")
}

if qCfg.NumConsumers <= 0 {
return errors.New("`num_consumers` must be positive")
}

return nil
}

type QueueSender struct {
queue exporterqueue.Queue[request.Request]
batcher component.Component
}

func NewQueueSender(
qf exporterqueue.Factory[request.Request],
qSet exporterqueue.Settings,
qSet exporterqueue.Settings[request.Request],
qCfg exporterqueue.Config,
bCfg exporterbatcher.Config,
exportFailureMessage string,
Expand All @@ -106,7 +70,7 @@ func NewQueueSender(
if bCfg.Enabled {
qCfg.NumConsumers = 1
}
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume))
q, err := newObsQueue(qSet, exporterqueue.NewQueue(context.Background(), qSet, qCfg, b.Consume))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 5bdc715

Please sign in to comment.