Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove exporterqueue.Factory and simplify exportehelper.WithQueueRequest #12509

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/break-exporterqueue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove exporterqueue.Factory in favor of the NewQueue function, and merge configs for memory and persistent.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: As a side effect of this change, no alternative implementation of the queue are supported and the Queue interface will be hidden.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/new-queue-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change the signature of the exporterhelper.WithQueueRequest to accept Encoding instead of the Factory.

# One or more tracking issues or pull requests related to the change
issues: [12509]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config internal.QueueConfig) Option {
func WithQueue(config QueueConfig) Option {
return internal.WithQueue(config)
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return internal.WithRequestQueue(cfg, queueFactory)
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
return internal.WithRequestQueue(cfg, encoding)

Check warning on line 54 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
Expand Down
65 changes: 20 additions & 45 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type BaseExporter struct {
component.StartFunc
component.ShutdownFunc

Marshaler exporterqueue.Marshaler[request.Request]
Unmarshaler exporterqueue.Unmarshaler[request.Request]
encoding exporterqueue.Encoding[request.Request]

Set exporter.Settings

Expand All @@ -45,18 +44,16 @@ type BaseExporter struct {

ConsumerOptions []consumer.Option

timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueFactory exporterqueue.Factory[request.Request]
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
be := &BaseExporter{
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](),
Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
}

for _, op := range options {
Expand Down Expand Up @@ -98,11 +95,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
}

if be.queueCfg.Enabled || be.batcherCfg.Enabled {
qSet := exporterqueue.Settings{
qSet := exporterqueue.Settings[request.Request]{
Signal: signal,
ExporterSettings: set,
Encoding: be.encoding,
}
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,44 +197,30 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
func WithQueue(cfg exporterqueue.Config) Option {
return func(o *BaseExporter) error {
if o.Marshaler == nil || o.Unmarshaler == nil {
if o.encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
Blocking: config.Blocking,
}
o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{
Marshaler: o.Marshaler,
Unmarshaler: o.Unmarshaler,
})
return nil
return WithRequestQueue(cfg, o.encoding)(o)
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option {
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
if o.Marshaler != nil || o.Unmarshaler != nil {
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
if cfg.Enabled && cfg.StorageID != nil && encoding == nil {
return errors.New("`encoding` must not be nil when persistent queue is enabled")
}
o.encoding = encoding
if !cfg.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
}
Expand All @@ -263,20 +247,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
}
}

// WithMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option {
return func(o *BaseExporter) error {
o.Marshaler = marshaler
return nil
}
}

// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithEncoding is used to set the request encoding for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option {
func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option {
return func(o *BaseExporter) error {
o.Unmarshaler = unmarshaler
o.encoding = encoding
return nil
}
}
21 changes: 11 additions & 10 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
require.Nil(t, bs.encoding)
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
require.Error(t, err)

qCfg := exporterqueue.NewDefaultConfig()
storageID := component.NewID(component.MustNewType("test"))
qCfg.StorageID = &storageID
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
WithRequestQueue(qCfg, nil))
require.Error(t, err)
}

Expand All @@ -93,7 +95,7 @@ func TestBaseExporterLogging(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal,
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
WithBatcher(exporterbatcher.NewDefaultConfig()),
WithRetry(rCfg))
require.NoError(t, err)
Expand All @@ -118,10 +120,9 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
func() Option {
qs := NewDefaultQueueConfig()
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithQueue(qs)
}(),
Expand All @@ -138,7 +139,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
func() Option {
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]())
return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1}))
}(),
func() Option {
bs := exporterbatcher.NewDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
tt.maxWorkers)
require.NoError(t, err)

q := exporterqueue.NewMemoryQueueFactory[request.Request]()(
q := exporterqueue.NewQueue[request.Request](
context.Background(),
exporterqueue.Settings{
exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type obsQueue[T request.Request] struct {
enqueueFailedInst metric.Int64Counter
}

func newObsQueue[T request.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/obs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 7, 9))
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalLogs,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
Expand All @@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 17, 19))
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalTraces,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand All @@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](nil, 27, 29))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](exporterqueue.Settings{
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
Expand Down
Loading
Loading