diff --git a/.chloggen/break-exporterqueue-factory.yaml b/.chloggen/break-exporterqueue-factory.yaml new file mode 100644 index 00000000000..78794637bc8 --- /dev/null +++ b/.chloggen/break-exporterqueue-factory.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterqueue + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove exporterqueue.Factory in favor of the NewQueue function, and merge configs for memory and persistent. + +# One or more tracking issues or pull requests related to the change +issues: [12509] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: As a side effect of this change, no alternative implementation of the queue are supported and the Queue interface will be hidden. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/new-queue-factory.yaml b/.chloggen/new-queue-factory.yaml new file mode 100644 index 00000000000..34bbcaee806 --- /dev/null +++ b/.chloggen/new-queue-factory.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change the signature of the exporterhelper.WithQueueRequest to accept Encoding instead of the Factory. + +# One or more tracking issues or pull requests related to the change +issues: [12509] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index ab1f0db4e0b..5434a2edc57 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -42,7 +42,7 @@ func WithRetry(config configretry.BackOffConfig) Option { // WithQueue overrides the default QueueConfig for an exporter. // The default QueueConfig is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. -func WithQueue(config internal.QueueConfig) Option { +func WithQueue(config QueueConfig) Option { return internal.WithQueue(config) } @@ -50,8 +50,8 @@ func WithQueue(config internal.QueueConfig) Option { // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option { - return internal.WithRequestQueue(cfg, queueFactory) +func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option { + return internal.WithRequestQueue(cfg, encoding) } // WithCapabilities overrides the default Capabilities() function for a Consumer. diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 579b0e310d6..8e6f87c32cd 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -27,8 +27,7 @@ type BaseExporter struct { component.StartFunc component.ShutdownFunc - Marshaler exporterqueue.Marshaler[request.Request] - Unmarshaler exporterqueue.Unmarshaler[request.Request] + encoding exporterqueue.Encoding[request.Request] Set exporter.Settings @@ -45,18 +44,16 @@ type BaseExporter struct { ConsumerOptions []consumer.Option - timeoutCfg TimeoutConfig - retryCfg configretry.BackOffConfig - queueFactory exporterqueue.Factory[request.Request] - queueCfg exporterqueue.Config - batcherCfg exporterbatcher.Config + timeoutCfg TimeoutConfig + retryCfg configretry.BackOffConfig + queueCfg exporterqueue.Config + batcherCfg exporterbatcher.Config } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) { be := &BaseExporter{ - Set: set, - timeoutCfg: NewDefaultTimeoutConfig(), - queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](), + Set: set, + timeoutCfg: NewDefaultTimeoutConfig(), } for _, op := range options { @@ -98,11 +95,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O } if be.queueCfg.Enabled || be.batcherCfg.Enabled { - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: signal, ExporterSettings: set, + Encoding: be.encoding, } - be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender) + be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender) if err != nil { return nil, err } @@ -199,26 +197,12 @@ func WithRetry(config configretry.BackOffConfig) Option { // WithQueue overrides the default QueueConfig for an exporter. // The default QueueConfig is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. -func WithQueue(config QueueConfig) Option { +func WithQueue(cfg exporterqueue.Config) Option { return func(o *BaseExporter) error { - if o.Marshaler == nil || o.Unmarshaler == nil { + if o.encoding == nil { return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } - if !config.Enabled { - o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." - return nil - } - o.queueCfg = exporterqueue.Config{ - Enabled: config.Enabled, - NumConsumers: config.NumConsumers, - QueueSize: config.QueueSize, - Blocking: config.Blocking, - } - o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{ - Marshaler: o.Marshaler, - Unmarshaler: o.Unmarshaler, - }) - return nil + return WithRequestQueue(cfg, o.encoding)(o) } } @@ -226,17 +210,17 @@ func WithQueue(config QueueConfig) Option { // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option { +func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option { return func(o *BaseExporter) error { - if o.Marshaler != nil || o.Unmarshaler != nil { - return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead") + if cfg.Enabled && cfg.StorageID != nil && encoding == nil { + return errors.New("`encoding` must not be nil when persistent queue is enabled") } + o.encoding = encoding if !cfg.Enabled { o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } o.queueCfg = cfg - o.queueFactory = queueFactory return nil } } @@ -263,20 +247,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option { } } -// WithMarshaler is used to set the request marshaler for the new exporter helper. -// It must be provided as the first option when creating a new exporter helper. -func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option { - return func(o *BaseExporter) error { - o.Marshaler = marshaler - return nil - } -} - -// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper. +// WithEncoding is used to set the request encoding for the new exporter helper. // It must be provided as the first option when creating a new exporter helper. -func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option { +func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option { return func(o *BaseExporter) error { - o.Unmarshaler = unmarshaler + o.encoding = encoding return nil } } diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 92915b45495..797cf3f01d4 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -71,16 +71,18 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, WithRetry(configretry.NewDefaultBackOffConfig())) require.NoError(t, err) - require.Nil(t, bs.Marshaler) - require.Nil(t, bs.Unmarshaler) + require.Nil(t, bs.encoding) _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, - WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) + WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig())) require.Error(t, err) + qCfg := exporterqueue.NewDefaultConfig() + storageID := component.NewID(component.MustNewType("test")) + qCfg.StorageID = &storageID _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})), + WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})), WithRetry(configretry.NewDefaultBackOffConfig()), - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]())) + WithRequestQueue(qCfg, nil)) require.Error(t, err) } @@ -93,7 +95,7 @@ func TestBaseExporterLogging(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.Enabled = false bs, err := NewBaseExporter(set, defaultSignal, - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()), + WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})), WithBatcher(exporterbatcher.NewDefaultConfig()), WithRetry(rCfg)) require.NoError(t, err) @@ -118,10 +120,9 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { { name: "WithQueue", queueOptions: []Option{ - WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})), + WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})), func() Option { - qs := NewDefaultQueueConfig() + qs := exporterqueue.NewDefaultConfig() qs.Enabled = false return WithQueue(qs) }(), @@ -138,7 +139,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { func() Option { qs := exporterqueue.NewDefaultConfig() qs.Enabled = false - return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]()) + return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1})) }(), func() Option { bs := exporterbatcher.NewDefaultConfig() diff --git a/exporter/exporterhelper/internal/batcher/disabled_batcher_test.go b/exporter/exporterhelper/internal/batcher/disabled_batcher_test.go index 09cd3f4a7f5..f577c2190be 100644 --- a/exporter/exporterhelper/internal/batcher/disabled_batcher_test.go +++ b/exporter/exporterhelper/internal/batcher/disabled_batcher_test.go @@ -45,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) { tt.maxWorkers) require.NoError(t, err) - q := exporterqueue.NewMemoryQueueFactory[request.Request]()( + q := exporterqueue.NewQueue[request.Request]( context.Background(), - exporterqueue.Settings{ + exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(exportertest.NopType), }, diff --git a/exporter/exporterhelper/internal/obs_queue.go b/exporter/exporterhelper/internal/obs_queue.go index 20b948532c0..6ab96a7a79f 100644 --- a/exporter/exporterhelper/internal/obs_queue.go +++ b/exporter/exporterhelper/internal/obs_queue.go @@ -23,7 +23,7 @@ type obsQueue[T request.Request] struct { enqueueFailedInst metric.Int64Counter } -func newObsQueue[T request.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) { +func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) { tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings) if err != nil { return nil, err diff --git a/exporter/exporterhelper/internal/obs_queue_test.go b/exporter/exporterhelper/internal/obs_queue_test.go index 3140ffd944b..82a3bb44e65 100644 --- a/exporter/exporterhelper/internal/obs_queue_test.go +++ b/exporter/exporterhelper/internal/obs_queue_test.go @@ -50,7 +50,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalLogs, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](nil, 7, 9)) @@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalLogs, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](errors.New("my error"), 7, 9)) @@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalTraces, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](nil, 17, 19)) @@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalTraces, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](errors.New("my error"), 0, 0)) @@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalMetrics, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](nil, 27, 29)) @@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := newObsQueue[request.Request](exporterqueue.Settings{ + te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalMetrics, ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, }, newFakeQueue[request.Request](errors.New("my error"), 0, 0)) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 9c0f515e640..de5917d1650 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -25,62 +25,13 @@ var _ = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), ) -// QueueConfig defines configuration for queueing batches before sending to the consumerSender. -type QueueConfig struct { - // Enabled indicates whether to not enqueue batches before sending to the consumerSender. - Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. Defaults to 10. - // If batching is enabled, a combined batch cannot contain more requests than the number of consumers. - // So it's recommended to set higher number of consumers if batching is enabled. - NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of batches allowed in queue at a given time. - QueueSize int `mapstructure:"queue_size"` - // Blocking controls the queue behavior when full. - // If true it blocks until enough space to add the new request to the queue. - Blocking bool `mapstructure:"blocking"` - // StorageID if not empty, enables the persistent storage and uses the component specified - // as a storage extension for the persistent queue - StorageID *component.ID `mapstructure:"storage"` -} - -// NewDefaultQueueConfig returns the default config for QueueConfig. -func NewDefaultQueueConfig() QueueConfig { - return QueueConfig{ - Enabled: true, - NumConsumers: 10, - // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue - // This can be estimated at 1-4 GB worth of maximum memory usage - // This default is probably still too high, and may be adjusted further down in a future release - QueueSize: 1_000, - Blocking: false, - } -} - -// Validate checks if the QueueConfig configuration is valid -func (qCfg *QueueConfig) Validate() error { - if !qCfg.Enabled { - return nil - } - - if qCfg.QueueSize <= 0 { - return errors.New("`queue_size` must be positive") - } - - if qCfg.NumConsumers <= 0 { - return errors.New("`num_consumers` must be positive") - } - - return nil -} - type QueueSender struct { queue exporterqueue.Queue[request.Request] batcher component.Component } func NewQueueSender( - qf exporterqueue.Factory[request.Request], - qSet exporterqueue.Settings, + qSet exporterqueue.Settings[request.Request], qCfg exporterqueue.Config, bCfg exporterbatcher.Config, exportFailureMessage string, @@ -106,7 +57,7 @@ func NewQueueSender( if bCfg.Enabled { qCfg.NumConsumers = 1 } - q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume)) + q, err := newObsQueue(qSet, exporterqueue.NewQueue(context.Background(), qSet, qCfg, b.Consume)) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index df48a739bcf..73726383e50 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -28,6 +28,22 @@ import ( "go.opentelemetry.io/collector/pipeline" ) +type fakeEncoding struct { + mr request.Request +} + +func (f fakeEncoding) Marshal(request.Request) ([]byte, error) { + return []byte("mockRequest"), nil +} + +func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) { + return f.mr, nil +} + +func newFakeEncoding(mr request.Request) exporterqueue.Encoding[request.Request] { + return &fakeEncoding{mr: mr} +} + func TestQueueBatcherStopWhileWaiting(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 1 @@ -67,79 +83,37 @@ func TestQueueBatcherDoNotPreserveCancellation(t *testing.T) { } func TestQueueBatcherHappyPath(t *testing.T) { - tests := []struct { - name string - qCfg exporterqueue.Config - qf exporterqueue.Factory[request.Request] - }{ - { - name: "WithRequestQueue/MemoryQueueFactory", - qCfg: exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, - qf: exporterqueue.NewMemoryQueueFactory[request.Request](), - }, - { - name: "WithRequestQueue/PersistentQueueFactory", - qCfg: exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, - qf: exporterqueue.NewPersistentQueueFactory[request.Request](nil, exporterqueue.PersistentQueueSettings[request.Request]{}), - }, + qCfg := exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, } + be, err := newQueueBatcherExporter(qCfg, exporterbatcher.Config{}) + require.NoError(t, err) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := newQueueBatcherExporter(tt.qCfg, exporterbatcher.Config{}) - require.NoError(t, err) - - sink := requesttest.NewSink() - for i := 0; i < 10; i++ { - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: i, Sink: sink})) - } - - // expect queue to be full - require.Error(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 10 && sink.ItemsCount() == 45 - }, 1*time.Second, 10*time.Millisecond) - require.NoError(t, be.Shutdown(context.Background())) - }) + sink := requesttest.NewSink() + for i := 0; i < 10; i++ { + require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: i, Sink: sink})) } -} - -func TestQueueConfig_Validate(t *testing.T) { - qCfg := NewDefaultQueueConfig() - require.NoError(t, qCfg.Validate()) - qCfg.QueueSize = 0 - require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive") + // expect queue to be full + require.Error(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - qCfg = NewDefaultQueueConfig() - qCfg.NumConsumers = 0 - - require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive") - - // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg.Enabled = false - assert.NoError(t, qCfg.Validate()) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + assert.Eventually(t, func() bool { + return sink.RequestsCount() == 10 && sink.ItemsCount() == 45 + }, 1*time.Second, 10*time.Millisecond) + require.NoError(t, be.Shutdown(context.Background())) } func TestQueueFailedRequestDropped(t *testing.T) { - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: defaultSignal, ExporterSettings: defaultSettings, } logger, observed := observer.New(zap.ErrorLevel) qSet.ExporterSettings.Logger = zap.New(logger) be, err := NewQueueSender( - exporterqueue.NewMemoryQueueFactory[request.Request](), qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", newNoopExportSender()) require.NoError(t, err) @@ -151,17 +125,15 @@ func TestQueueFailedRequestDropped(t *testing.T) { } func TestQueueBatcherPersistenceEnabled(t *testing.T) { - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: defaultSignal, ExporterSettings: defaultSettings, + Encoding: newFakeEncoding(&requesttest.FakeRequest{}), } + qCfg := exporterqueue.NewDefaultConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - be, err := NewQueueSender( - exporterqueue.NewPersistentQueueFactory[request.Request](&storageID, exporterqueue.PersistentQueueSettings[request.Request]{ - Marshaler: mockRequestMarshaler, - Unmarshaler: mockRequestUnmarshaler(&requesttest.FakeRequest{}), - }), - qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", newNoopExportSender()) + qCfg.StorageID = &storageID + be, err := NewQueueSender(qSet, qCfg, exporterbatcher.Config{}, "", newNoopExportSender()) require.NoError(t, err) extensions := map[component.ID]component.Component{ @@ -177,17 +149,15 @@ func TestQueueBatcherPersistenceEnabled(t *testing.T) { func TestQueueBatcherPersistenceEnabledStorageError(t *testing.T) { storageError := errors.New("could not get storage client") - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: defaultSignal, ExporterSettings: defaultSettings, + Encoding: newFakeEncoding(&requesttest.FakeRequest{}), } + qCfg := exporterqueue.NewDefaultConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - be, err := NewQueueSender( - exporterqueue.NewPersistentQueueFactory[request.Request](&storageID, exporterqueue.PersistentQueueSettings[request.Request]{ - Marshaler: mockRequestMarshaler, - Unmarshaler: mockRequestUnmarshaler(&requesttest.FakeRequest{}), - }), - qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", newNoopExportSender()) + qCfg.StorageID = &storageID + be, err := NewQueueSender(qSet, qCfg, exporterbatcher.Config{}, "", newNoopExportSender()) require.NoError(t, err) extensions := map[component.ID]component.Component{ @@ -202,6 +172,8 @@ func TestQueueBatcherPersistenceEnabledStorageError(t *testing.T) { func TestQueueBatcherPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 1 + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = time.Millisecond @@ -209,18 +181,13 @@ func TestQueueBatcherPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { rs := newRetrySender(rCfg, defaultSettings, newNoopExportSender()) require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost())) - qSet := exporterqueue.Settings{ + mockReq := newErrorRequest(errors.New("transient error")) + qSet := exporterqueue.Settings[request.Request]{ Signal: defaultSignal, ExporterSettings: defaultSettings, + Encoding: newFakeEncoding(mockReq), } - storageID := component.MustNewIDWithName("file_storage", "storage") - mockReq := newErrorRequest(errors.New("transient error")) - be, err := NewQueueSender( - exporterqueue.NewPersistentQueueFactory[request.Request](&storageID, exporterqueue.PersistentQueueSettings[request.Request]{ - Marshaler: mockRequestMarshaler, - Unmarshaler: mockRequestUnmarshaler(mockReq), - }), - qSet, qCfg, exporterbatcher.Config{}, "", rs) + be, err := NewQueueSender(qSet, qCfg, exporterbatcher.Config{}, "", rs) require.NoError(t, err) extensions := map[component.ID]component.Component{ @@ -245,12 +212,8 @@ func TestQueueBatcherPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. sink := requesttest.NewSink() replacedReq := &requesttest.FakeRequest{Items: 7, Sink: sink} - be, err = NewQueueSender( - exporterqueue.NewPersistentQueueFactory[request.Request](&storageID, exporterqueue.PersistentQueueSettings[request.Request]{ - Marshaler: mockRequestMarshaler, - Unmarshaler: mockRequestUnmarshaler(replacedReq), - }), - qSet, qCfg, exporterbatcher.Config{}, "", newNoopExportSender()) + qSet.Encoding = newFakeEncoding(replacedReq) + be, err = NewQueueSender(qSet, qCfg, exporterbatcher.Config{}, "", newNoopExportSender()) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), host)) @@ -263,11 +226,11 @@ func TestQueueBatcherPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { func TestQueueBatcherNoStartShutdown(t *testing.T) { set := exportertest.NewNopSettings(exportertest.NopType) set.ID = exporterID - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: pipeline.SignalTraces, ExporterSettings: set, } - qs, err := NewQueueSender(exporterqueue.NewMemoryQueueFactory[request.Request](), qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.NewDefaultConfig(), "", newNoopExportSender()) + qs, err := NewQueueSender(qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.NewDefaultConfig(), "", newNoopExportSender()) require.NoError(t, err) assert.NoError(t, qs.Shutdown(context.Background())) } @@ -284,20 +247,20 @@ func TestQueueBatcher_Merge(t *testing.T) { { name: "split_disabled", batchCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSize = 10 - cfg.FlushTimeout = 100 * time.Millisecond - return cfg + qCfg := exporterbatcher.NewDefaultConfig() + qCfg.MinSize = 10 + qCfg.FlushTimeout = 100 * time.Millisecond + return qCfg }(), }, { name: "split_high_limit", batchCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSize = 10 - cfg.FlushTimeout = 100 * time.Millisecond - cfg.MaxSize = 1000 - return cfg + qCfg := exporterbatcher.NewDefaultConfig() + qCfg.MinSize = 10 + qCfg.FlushTimeout = 100 * time.Millisecond + qCfg.MaxSize = 1000 + return qCfg }(), }, } @@ -673,9 +636,9 @@ func TestQueueBatcherTimerFlush(t *testing.T) { } func newQueueBatcherExporter(qCfg exporterqueue.Config, bCfg exporterbatcher.Config) (*QueueSender, error) { - qSet := exporterqueue.Settings{ + qSet := exporterqueue.Settings[request.Request]{ Signal: defaultSignal, ExporterSettings: defaultSettings, } - return NewQueueSender(exporterqueue.NewMemoryQueueFactory[request.Request](), qSet, qCfg, bCfg, "", newNoopExportSender()) + return NewQueueSender(qSet, qCfg, bCfg, "", newNoopExportSender()) } diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 20a404485f0..b6ae31b1d8f 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -21,20 +21,9 @@ import ( "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" - "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" ) -func mockRequestUnmarshaler(mr request.Request) exporterqueue.Unmarshaler[request.Request] { - return func([]byte) (request.Request, error) { - return mr, nil - } -} - -func mockRequestMarshaler(request.Request) ([]byte, error) { - return []byte("mockRequest"), nil -} - func TestRetrySenderDropOnPermanentError(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() sink := requesttest.NewSink() diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 88921825e24..8af6f3c56ff 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" - "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" ) @@ -39,17 +38,19 @@ func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { } } -func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) exporterqueue.Unmarshaler[Request] { - return func(bytes []byte) (Request, error) { - logs, err := logsUnmarshaler.UnmarshalLogs(bytes) - if err != nil { - return nil, err - } - return newLogsRequest(logs, pusher), nil +type logsEncoding struct { + pusher consumer.ConsumeLogsFunc +} + +func (le *logsEncoding) Unmarshal(bytes []byte) (Request, error) { + logs, err := logsUnmarshaler.UnmarshalLogs(bytes) + if err != nil { + return nil, err } + return newLogsRequest(logs, le.pusher), nil } -func logsRequestMarshaler(req Request) ([]byte, error) { +func (le *logsEncoding) Marshal(req Request) ([]byte, error) { return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) } @@ -85,7 +86,7 @@ type logsExporter struct { consumer.Logs } -// NewLogs creates an exporter.Logs that records observability metrics and wraps every request with a Span. +// NewLogs creates an exporter.Logs that records observability logs and wraps every request with a Span. func NewLogs( ctx context.Context, set exporter.Settings, @@ -99,10 +100,7 @@ func NewLogs( if pusher == nil { return nil, errNilPushLogsData } - logsOpts := []Option{ - internal.WithMarshaler(logsRequestMarshaler), internal.WithUnmarshaler(newLogsRequestUnmarshalerFunc(pusher)), - } - return NewLogsRequest(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...) + return NewLogsRequest(ctx, set, requestFromLogs(pusher), append([]Option{internal.WithEncoding(&logsEncoding{pusher: pusher})}, options...)...) } // RequestFromLogsFunc converts plog.Logs data into a user-defined request. diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index cc4b090e9fe..b374b8aa689 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pipeline" ) @@ -38,17 +37,19 @@ func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) R } } -func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) exporterqueue.Unmarshaler[Request] { - return func(bytes []byte) (Request, error) { - metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) - if err != nil { - return nil, err - } - return newMetricsRequest(metrics, pusher), nil +type metricsEncoding struct { + pusher consumer.ConsumeMetricsFunc +} + +func (me *metricsEncoding) Unmarshal(bytes []byte) (Request, error) { + metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) + if err != nil { + return nil, err } + return newMetricsRequest(metrics, me.pusher), nil } -func metricsRequestMarshaler(req Request) ([]byte, error) { +func (me *metricsEncoding) Marshal(req Request) ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) } @@ -91,10 +92,7 @@ func NewMetrics( if pusher == nil { return nil, errNilPushMetricsData } - metricsOpts := []Option{ - internal.WithMarshaler(metricsRequestMarshaler), internal.WithUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher)), - } - return NewMetricsRequest(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...) + return NewMetricsRequest(ctx, set, requestFromMetrics(pusher), append([]Option{internal.WithEncoding(&metricsEncoding{pusher: pusher})}, options...)...) } // RequestFromMetricsFunc converts pdata.Metrics into a user-defined request. diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 6d15988c443..3d5f960539c 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -70,8 +70,7 @@ func TestMetrics_NilLogger(t *testing.T) { } func TestMetricsRequest_NilLogger(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, - requesttest.RequestFromMetricsFunc(nil)) + me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromMetricsFunc(nil)) require.Nil(t, me) require.Equal(t, errNilLogger, err) } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index f09e03f2b94..416d48cb3bc 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -4,13 +4,21 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" ) // QueueConfig defines configuration for queueing batches before sending to the consumerSender. -type QueueConfig = internal.QueueConfig +type QueueConfig = exporterqueue.Config // NewDefaultQueueConfig returns the default config for QueueConfig. func NewDefaultQueueConfig() QueueConfig { - return internal.NewDefaultQueueConfig() + return exporterqueue.Config{ + Enabled: true, + NumConsumers: 10, + // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue + // This can be estimated at 1-4 GB worth of maximum memory usage + // This default is probably still too high, and may be adjusted further down in a future release + QueueSize: 1_000, + Blocking: false, + } } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index f4ade843b9a..1c88dbf83bd 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) @@ -38,17 +37,19 @@ func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Reque } } -func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) exporterqueue.Unmarshaler[Request] { - return func(bytes []byte) (Request, error) { - traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) - if err != nil { - return nil, err - } - return newTracesRequest(traces, pusher), nil +type tracesEncoding struct { + pusher consumer.ConsumeTracesFunc +} + +func (te *tracesEncoding) Unmarshal(bytes []byte) (Request, error) { + traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) + if err != nil { + return nil, err } + return newTracesRequest(traces, te.pusher), nil } -func tracesRequestMarshaler(req Request) ([]byte, error) { +func (te *tracesEncoding) Marshal(req Request) ([]byte, error) { return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } @@ -91,10 +92,7 @@ func NewTraces( if pusher == nil { return nil, errNilPushTraceData } - tracesOpts := []Option{ - internal.WithMarshaler(tracesRequestMarshaler), internal.WithUnmarshaler(newTraceRequestUnmarshalerFunc(pusher)), - } - return NewTracesRequest(ctx, set, requestFromTraces(pusher), append(tracesOpts, options...)...) + return NewTracesRequest(ctx, set, requestFromTraces(pusher), append([]Option{internal.WithEncoding(&tracesEncoding{pusher: pusher})}, options...)...) } // RequestFromTracesFunc converts ptrace.Traces into a user-defined Request. diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 3a74bf0a2de..d41cfbd2d53 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pipeline/xpipeline" @@ -41,17 +40,19 @@ func newProfilesRequest(pd pprofile.Profiles, pusher xconsumer.ConsumeProfilesFu } } -func newProfileRequestUnmarshalerFunc(pusher xconsumer.ConsumeProfilesFunc) exporterqueue.Unmarshaler[exporterhelper.Request] { - return func(bytes []byte) (exporterhelper.Request, error) { - profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes) - if err != nil { - return nil, err - } - return newProfilesRequest(profiles, pusher), nil +type profilesEncoding struct { + pusher xconsumer.ConsumeProfilesFunc +} + +func (le *profilesEncoding) Unmarshal(bytes []byte) (exporterhelper.Request, error) { + profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes) + if err != nil { + return nil, err } + return newProfilesRequest(profiles, le.pusher), nil } -func profilesRequestMarshaler(req exporterhelper.Request) ([]byte, error) { +func (le *profilesEncoding) Marshal(req exporterhelper.Request) ([]byte, error) { return profilesMarshaler.MarshalProfiles(req.(*profilesRequest).pd) } @@ -94,10 +95,8 @@ func NewProfilesExporter( if pusher == nil { return nil, errNilPushProfileData } - profilesOpts := []exporterhelper.Option{ - internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)), - } - return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...) + opts := []exporterhelper.Option{internal.WithEncoding(&profilesEncoding{pusher: pusher})} + return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(opts, options...)...) } // RequestFromProfilesFunc converts pprofile.Profiles into a user-defined Request. diff --git a/exporter/exporterqueue/config.go b/exporter/exporterqueue/config.go index f68599a0d17..eb726465942 100644 --- a/exporter/exporterqueue/config.go +++ b/exporter/exporterqueue/config.go @@ -23,6 +23,9 @@ type Config struct { // Blocking controls the queue behavior when full. // If true it blocks until enough space to add the new request to the queue. Blocking bool `mapstructure:"blocking"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` } // NewDefaultConfig returns the default Config. @@ -50,16 +53,3 @@ func (qCfg *Config) Validate() error { } return nil } - -// PersistentQueueConfig defines configuration for queueing requests in a persistent storage. -// The struct is provided to be added in the exporter configuration as one struct under the "sending_queue" key. -// The exporter helper Go interface requires the fields to be provided separately to WithRequestQueue and -// NewPersistentQueueFactory. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type PersistentQueueConfig struct { - Config `mapstructure:",squash"` - // StorageID if not empty, enables the persistent storage and uses the component specified - // as a storage extension for the persistent queue - StorageID *component.ID `mapstructure:"storage"` -} diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index 44d5b69b24b..9a51bc529d7 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -45,14 +45,13 @@ var indexDonePool = sync.Pool{ } type persistentQueueSettings[T any] struct { - sizer sizer[T] - capacity int64 - blocking bool - signal pipeline.Signal - storageID component.ID - marshaler Marshaler[T] - unmarshaler Unmarshaler[T] - set exporter.Settings + sizer sizer[T] + capacity int64 + blocking bool + signal pipeline.Signal + storageID component.ID + encoding Encoding[T] + set exporter.Settings } // persistentQueue provides a persistent queue implementation backed by file storage extension @@ -250,7 +249,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { } } - reqBuf, err := pq.set.marshaler(req) + reqBuf, err := pq.set.encoding.Marshal(req) if err != nil { return err } @@ -326,7 +325,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) var request T if err == nil { - request, err = pq.set.unmarshaler(getOp.Value) + request, err = pq.set.encoding.Unmarshal(getOp.Value) } if err != nil { @@ -435,7 +434,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, err := pq.set.unmarshaler(op.Value) + req, err := pq.set.encoding.Unmarshal(op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index ffb897f41a6..12b6bc202e9 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -38,11 +38,13 @@ func (is *itemsSizer) Sizeof(val uint64) int64 { return int64(val) } -func uint64Marshaler(val uint64) ([]byte, error) { +type uint64Encoding struct{} + +func (uint64Encoding) Marshal(val uint64) ([]byte, error) { return binary.LittleEndian.AppendUint64([]byte{}, val), nil } -func uint64Unmarshaler(bytes []byte) (uint64, error) { +func (uint64Encoding) Unmarshal(bytes []byte) (uint64, error) { if len(bytes) < 8 { return 0, errInvalidValue } @@ -221,13 +223,12 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[uint64], capaci consumeFunc func(_ context.Context, item uint64) error, ) Queue[uint64] { pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ - sizer: sizer, - capacity: capacity, - signal: pipeline.SignalTraces, - storageID: component.ID{}, - marshaler: uint64Marshaler, - unmarshaler: uint64Unmarshaler, - set: exportertest.NewNopSettings(exportertest.NopType), + sizer: sizer, + capacity: capacity, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + set: exportertest.NewNopSettings(exportertest.NopType), }) ac := newAsyncQueue(pq, numConsumers, func(ctx context.Context, item uint64, done Done) { done.OnDone(consumeFunc(ctx, item)) @@ -244,13 +245,12 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[uint64], capaci func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[uint64] { pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ - sizer: &requestSizer[uint64]{}, - capacity: 1000, - signal: pipeline.SignalTraces, - storageID: component.ID{}, - marshaler: uint64Marshaler, - unmarshaler: uint64Unmarshaler, - set: exportertest.NewNopSettings(exportertest.NopType), + sizer: &requestSizer[uint64]{}, + capacity: 1000, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + set: exportertest.NewNopSettings(exportertest.NopType), }).(*persistentQueue[uint64]) pq.initClient(context.Background(), client) return pq @@ -268,13 +268,12 @@ func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Ext capacity int64, ) *persistentQueue[uint64] { pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ - sizer: sizer, - capacity: capacity, - signal: pipeline.SignalTraces, - storageID: component.ID{}, - marshaler: uint64Marshaler, - unmarshaler: uint64Unmarshaler, - set: exportertest.NewNopSettings(exportertest.NopType), + sizer: sizer, + capacity: capacity, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + set: exportertest.NewNopSettings(exportertest.NopType), }).(*persistentQueue[uint64]) require.NoError(tb, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq @@ -415,14 +414,13 @@ func TestPersistentBlockingQueue(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ - sizer: tt.sizer, - capacity: 100, - blocking: true, - signal: pipeline.SignalTraces, - storageID: component.ID{}, - marshaler: uint64Marshaler, - unmarshaler: uint64Unmarshaler, - set: exportertest.NewNopSettings(exportertest.NopType), + sizer: tt.sizer, + capacity: 100, + blocking: true, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + set: exportertest.NewNopSettings(exportertest.NopType), }) consumed := &atomic.Int64{} ac := newAsyncQueue(pq, 10, func(_ context.Context, _ uint64, done Done) { @@ -917,7 +915,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { } func TestPersistentQueue_StorageFull(t *testing.T) { - marshaled, err := uint64Marshaler(uint64(50)) + marshaled, err := uint64Encoding{}.Marshal(uint64(50)) require.NoError(t, err) maxSizeInBytes := len(marshaled) * 5 // arbitrary small number diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index aa73c5708bc..409bb0b6968 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -56,75 +56,45 @@ type readableQueue[T any] interface { } // Settings defines settings for creating a queue. -type Settings struct { +type Settings[T any] struct { Signal pipeline.Signal ExporterSettings exporter.Settings + Encoding Encoding[T] } -// Marshaler is a function that can marshal a request into bytes. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Marshaler[T any] func(T) ([]byte, error) - -// Unmarshaler is a function that can unmarshal bytes into a request. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Unmarshaler[T any] func([]byte) (T, error) +type Encoding[T any] interface { + // Marshal is a function that can marshal a request into bytes. + Marshal(T) ([]byte, error) -// Factory is a function that creates a new queue. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Factory[T any] func(context.Context, Settings, Config, ConsumeFunc[T]) Queue[T] - -// NewMemoryQueueFactory returns a factory to create a new memory queue. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewMemoryQueueFactory[T any]() Factory[T] { - return func(_ context.Context, _ Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] { - if !cfg.Enabled { - return newDisabledQueue(consume) - } - q := newMemoryQueue[T](memoryQueueSettings[T]{ - sizer: &requestSizer[T]{}, - capacity: int64(cfg.QueueSize), - blocking: cfg.Blocking, - }) - return newAsyncQueue(q, cfg.NumConsumers, consume) - } + // Unmarshal is a function that can unmarshal bytes into a request. + Unmarshal([]byte) (T, error) } -// PersistentQueueSettings defines developer settings for the persistent queue factory. +// NewQueue returns a queue // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type PersistentQueueSettings[T any] struct { - // Marshaler is used to serialize queue elements before storing them in the persistent storage. - Marshaler Marshaler[T] - // Unmarshaler is used to deserialize requests after reading them from the persistent storage. - Unmarshaler Unmarshaler[T] -} - -// NewPersistentQueueFactory returns a factory to create a new persistent queue. -// If cfg.storageID is nil then it falls back to memory queue. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] { - if storageID == nil { - return NewMemoryQueueFactory[T]() +func NewQueue[T any](_ context.Context, set Settings[T], cfg Config, consume ConsumeFunc[T]) Queue[T] { + if !cfg.Enabled { + return newDisabledQueue(consume) } - return func(_ context.Context, set Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] { - if !cfg.Enabled { - return newDisabledQueue(consume) - } + if cfg.StorageID != nil { q := newPersistentQueue[T](persistentQueueSettings[T]{ - sizer: &requestSizer[T]{}, - capacity: int64(cfg.QueueSize), - blocking: cfg.Blocking, - signal: set.Signal, - storageID: *storageID, - marshaler: factorySettings.Marshaler, - unmarshaler: factorySettings.Unmarshaler, - set: set.ExporterSettings, + sizer: &requestSizer[T]{}, + capacity: int64(cfg.QueueSize), + blocking: cfg.Blocking, + signal: set.Signal, + storageID: *cfg.StorageID, + encoding: set.Encoding, + set: set.ExporterSettings, }) return newAsyncQueue(q, cfg.NumConsumers, consume) } + + q := newMemoryQueue[T](memoryQueueSettings[T]{ + sizer: &requestSizer[T]{}, + capacity: int64(cfg.QueueSize), + blocking: cfg.Blocking, + }) + + return newAsyncQueue(q, cfg.NumConsumers, consume) }