Skip to content

Commit

Permalink
Remove exporterqueue.Factory and simplify exportehelper.WithQueueRequ…
Browse files Browse the repository at this point in the history
…est (#12509)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

<!-- Issue number if applicable -->
#### Link to tracking issue
Fixes #

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 28, 2025
1 parent f38b0e1 commit d510c86
Show file tree
Hide file tree
Showing 21 changed files with 287 additions and 401 deletions.
25 changes: 25 additions & 0 deletions .chloggen/break-exporterqueue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove exporterqueue.Factory in favor of the NewQueue function, and merge configs for memory and persistent.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: As a side effect of this change, no alternative implementation of the queue are supported and the Queue interface will be hidden.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/new-queue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change the signature of the exporterhelper.WithQueueRequest to accept Encoding instead of the Factory.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config internal.QueueConfig) Option {
func WithQueue(config QueueConfig) Option {
return internal.WithQueue(config)
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return internal.WithRequestQueue(cfg, queueFactory)
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
return internal.WithRequestQueue(cfg, encoding)
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
Expand Down
65 changes: 20 additions & 45 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type BaseExporter struct {
component.StartFunc
component.ShutdownFunc

Marshaler exporterqueue.Marshaler[request.Request]
Unmarshaler exporterqueue.Unmarshaler[request.Request]
encoding exporterqueue.Encoding[request.Request]

Set exporter.Settings

Expand All @@ -45,18 +44,16 @@ type BaseExporter struct {

ConsumerOptions []consumer.Option

timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueFactory exporterqueue.Factory[request.Request]
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
be := &BaseExporter{
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](),
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
}

for _, op := range options {
Expand Down Expand Up @@ -98,11 +95,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
}

if be.queueCfg.Enabled || be.batcherCfg.Enabled {
qSet := exporterqueue.Settings{
qSet := exporterqueue.Settings[request.Request]{
Signal: signal,
ExporterSettings: set,
Encoding: be.encoding,
}
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,44 +197,30 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
func WithQueue(cfg exporterqueue.Config) Option {
return func(o *BaseExporter) error {
if o.Marshaler == nil || o.Unmarshaler == nil {
if o.encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
Blocking: config.Blocking,
}
o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{
Marshaler: o.Marshaler,
Unmarshaler: o.Unmarshaler,
})
return nil
return WithRequestQueue(cfg, o.encoding)(o)
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option {
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
if o.Marshaler != nil || o.Unmarshaler != nil {
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
if cfg.Enabled && cfg.StorageID != nil && encoding == nil {
return errors.New("`encoding` must not be nil when persistent queue is enabled")
}
o.encoding = encoding
if !cfg.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
}
Expand All @@ -263,20 +247,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
}
}

// WithMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option {
return func(o *BaseExporter) error {
o.Marshaler = marshaler
return nil
}
}

// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithEncoding is used to set the request encoding for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option {
func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
o.Unmarshaler = unmarshaler
o.encoding = encoding
return nil
}
}
21 changes: 11 additions & 10 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
require.Nil(t, bs.encoding)
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
require.Error(t, err)

qCfg := exporterqueue.NewDefaultConfig()
storageID := component.NewID(component.MustNewType("test"))
qCfg.StorageID = &storageID
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
WithRequestQueue(qCfg, nil))
require.Error(t, err)
}

Expand All @@ -93,7 +95,7 @@ func TestBaseExporterLogging(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal,
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
WithBatcher(exporterbatcher.NewDefaultConfig()),
WithRetry(rCfg))
require.NoError(t, err)
Expand All @@ -118,10 +120,9 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
func() Option {
qs := NewDefaultQueueConfig()
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithQueue(qs)
}(),
Expand All @@ -138,7 +139,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
func() Option {
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]())
return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1}))
}(),
func() Option {
bs := exporterbatcher.NewDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
tt.maxWorkers)
require.NoError(t, err)

q := exporterqueue.NewMemoryQueueFactory[request.Request]()(
q := exporterqueue.NewQueue[request.Request](
context.Background(),
exporterqueue.Settings{
exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type obsQueue[T request.Request] struct {
enqueueFailedInst metric.Int64Counter
}

func newObsQueue[T request.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/obs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 7, 9))
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
Expand All @@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 17, 19))
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand All @@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 27, 29))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand Down
Loading

0 comments on commit d510c86

Please sign in to comment.