diff --git a/.chloggen/iblancasa-memorylimiterprocessor-add-support-profiles.yaml b/.chloggen/iblancasa-memorylimiterprocessor-add-support-profiles.yaml new file mode 100644 index 00000000000..4f55e9c3d8d --- /dev/null +++ b/.chloggen/iblancasa-memorylimiterprocessor-add-support-profiles.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: memorylimiterprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for profiles. + +# One or more tracking issues or pull requests related to the change +issues: [12453] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/builder/internal/builder/main_test.go b/cmd/builder/internal/builder/main_test.go index 880917b21bd..b9d10b70779 100644 --- a/cmd/builder/internal/builder/main_test.go +++ b/cmd/builder/internal/builder/main_test.go @@ -97,6 +97,7 @@ var replaceModules = []string{ "/processor/processortest", "/processor/batchprocessor", "/processor/memorylimiterprocessor", + "/processor/processorhelper/xprocessorhelper", "/processor/xprocessor", "/receiver", "/receiver/nopreceiver", diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 0908e60eb68..0004574d3e7 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -99,6 +99,7 @@ replaces: - go.opentelemetry.io/collector/processor/batchprocessor => ../../processor/batchprocessor - go.opentelemetry.io/collector/processor/memorylimiterprocessor => ../../processor/memorylimiterprocessor - go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor + - go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../../processor/processorhelper/xprocessorhelper - go.opentelemetry.io/collector/receiver => ../../receiver - go.opentelemetry.io/collector/receiver/nopreceiver => ../../receiver/nopreceiver - go.opentelemetry.io/collector/receiver/otlpreceiver => ../../receiver/otlpreceiver diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index dd1704e4620..4880850ab49 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -118,6 +118,7 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect go.opentelemetry.io/collector/pipeline v0.120.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 // indirect + go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.120.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.120.0 // indirect @@ -287,6 +288,8 @@ replace go.opentelemetry.io/collector/processor/memorylimiterprocessor => ../../ replace go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor +replace go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../../processor/processorhelper/xprocessorhelper + replace go.opentelemetry.io/collector/receiver => ../../receiver replace go.opentelemetry.io/collector/receiver/nopreceiver => ../../receiver/nopreceiver diff --git a/processor/memorylimiterprocessor/README.md b/processor/memorylimiterprocessor/README.md index 661d86508c1..492f0e1d73b 100644 --- a/processor/memorylimiterprocessor/README.md +++ b/processor/memorylimiterprocessor/README.md @@ -3,10 +3,12 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: traces, metrics, logs | +| Stability | [alpha]: profiles | +| | [beta]: traces, metrics, logs | | Distributions | [core], [contrib], [k8s] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fmemorylimiter%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fmemorylimiter%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) | +[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha [beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta [core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/processor/memorylimiterprocessor/factory.go b/processor/memorylimiterprocessor/factory.go index 39ae9bf782a..59c95cec9bf 100644 --- a/processor/memorylimiterprocessor/factory.go +++ b/processor/memorylimiterprocessor/factory.go @@ -11,12 +11,15 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/memorylimiter" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/telemetry/componentattribute" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" + "go.opentelemetry.io/collector/processor/xprocessor" ) var processorCapabilities = consumer.Capabilities{MutatesData: false} @@ -29,16 +32,17 @@ type factory struct { } // NewFactory returns a new factory for the Memory Limiter processor. -func NewFactory() processor.Factory { +func NewFactory() xprocessor.Factory { f := &factory{ memoryLimiters: map[component.Config]*memoryLimiterProcessor{}, } - return processor.NewFactory( + return xprocessor.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(f.createTraces, metadata.TracesStability), - processor.WithMetrics(f.createMetrics, metadata.MetricsStability), - processor.WithLogs(f.createLogs, metadata.LogsStability)) + xprocessor.WithTraces(f.createTraces, metadata.TracesStability), + xprocessor.WithMetrics(f.createMetrics, metadata.MetricsStability), + xprocessor.WithLogs(f.createLogs, metadata.LogsStability), + xprocessor.WithProfiles(f.createProfiles, metadata.ProfilesStability)) } // CreateDefaultConfig creates the default configuration for processor. Notice @@ -98,6 +102,29 @@ func (f *factory) createLogs( processorhelper.WithShutdown(memLimiter.shutdown)) } +func (f *factory) createProfiles( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xprocessor.Profiles, error) { + memLimiter, err := f.getMemoryLimiter(set, cfg) + if err != nil { + return nil, err + } + + return xprocessorhelper.NewProfiles( + ctx, + set, + cfg, + nextConsumer, + memLimiter.processProfiles, + xprocessorhelper.WithCapabilities(processorCapabilities), + xprocessorhelper.WithStart(memLimiter.start), + xprocessorhelper.WithShutdown(memLimiter.shutdown), + ) +} + // getMemoryLimiter checks if we have a cached memoryLimiter with a specific config, // otherwise initialize and add one to the store. func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) { diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index d824560271d..dba8615dada 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -68,11 +68,17 @@ func TestCreateProcessor(t *testing.T) { lp, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, lp) - assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + + pp, err := factory.CreateProfiles(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NotNil(t, pp) + assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, lp.Shutdown(context.Background())) assert.NoError(t, tp.Shutdown(context.Background())) assert.NoError(t, mp.Shutdown(context.Background())) + assert.NoError(t, pp.Shutdown(context.Background())) // verify that no monitoring routine is running require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index 1dafa679120..cf6792008a3 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -10,12 +10,17 @@ require ( go.opentelemetry.io/collector/consumer v1.26.0 go.opentelemetry.io/collector/consumer/consumererror v0.120.0 go.opentelemetry.io/collector/consumer/consumertest v0.120.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 go.opentelemetry.io/collector/internal/memorylimiter v0.120.0 go.opentelemetry.io/collector/internal/telemetry v0.120.0 go.opentelemetry.io/collector/pdata v1.26.0 + go.opentelemetry.io/collector/pdata/pprofile v0.120.0 go.opentelemetry.io/collector/pipeline v0.120.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 go.opentelemetry.io/collector/processor v0.120.0 + go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 go.opentelemetry.io/collector/processor/processortest v0.120.0 + go.opentelemetry.io/collector/processor/xprocessor v0.120.0 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/metric v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 @@ -50,10 +55,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.120.0 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.120.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect - go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.33.0 // indirect @@ -103,3 +105,7 @@ replace go.opentelemetry.io/collector/internal/memorylimiter => ../../internal/m replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror + +replace go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../processorhelper/xprocessorhelper + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/processor/memorylimiterprocessor/internal/metadata/generated_status.go b/processor/memorylimiterprocessor/internal/metadata/generated_status.go index 0e841608278..e22c704f519 100644 --- a/processor/memorylimiterprocessor/internal/metadata/generated_status.go +++ b/processor/memorylimiterprocessor/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - TracesStability = component.StabilityLevelBeta - MetricsStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelAlpha + TracesStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta ) diff --git a/processor/memorylimiterprocessor/memorylimiter.go b/processor/memorylimiterprocessor/memorylimiter.go index 45da42e0b32..fbb9f81bf32 100644 --- a/processor/memorylimiterprocessor/memorylimiter.go +++ b/processor/memorylimiterprocessor/memorylimiter.go @@ -10,8 +10,10 @@ import ( "go.opentelemetry.io/collector/internal/memorylimiter" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" ) @@ -50,11 +52,8 @@ func (p *memoryLimiterProcessor) shutdown(ctx context.Context) error { func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { numSpans := td.SpanCount() if p.memlimiter.MustRefuse() { - // TODO: actually to be 100% sure that this is "refused" and not "dropped" - // it is necessary to check the pipeline to see if this is directly connected - // to a receiver (ie.: a receiver is on the call stack). For now it - // assumes that the pipeline is properly configured and a receiver is on the - // callstack and that the receiver will correctly retry the refused data again. + // TODO: + // https://github.com/open-telemetry/opentelemetry-collector/issues/12463 p.obsrep.refused(ctx, numSpans, pipeline.SignalTraces) return td, memorylimiter.ErrDataRefused } @@ -68,11 +67,8 @@ func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Tr func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { numDataPoints := md.DataPointCount() if p.memlimiter.MustRefuse() { - // TODO: actually to be 100% sure that this is "refused" and not "dropped" - // it is necessary to check the pipeline to see if this is directly connected - // to a receiver (ie.: a receiver is on the call stack). For now it - // assumes that the pipeline is properly configured and a receiver is on the - // callstack. + // TODO: + // https://github.com/open-telemetry/opentelemetry-collector/issues/12463 p.obsrep.refused(ctx, numDataPoints, pipeline.SignalMetrics) return md, memorylimiter.ErrDataRefused } @@ -86,11 +82,8 @@ func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric. func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { numRecords := ld.LogRecordCount() if p.memlimiter.MustRefuse() { - // TODO: actually to be 100% sure that this is "refused" and not "dropped" - // it is necessary to check the pipeline to see if this is directly connected - // to a receiver (ie.: a receiver is on the call stack). For now it - // assumes that the pipeline is properly configured and a receiver is on the - // callstack. + // TODO: + // https://github.com/open-telemetry/opentelemetry-collector/issues/12463 p.obsrep.refused(ctx, numRecords, pipeline.SignalLogs) return ld, memorylimiter.ErrDataRefused } @@ -100,3 +93,18 @@ func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs) return ld, nil } + +func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) { + numProfiles := td.SampleCount() + if p.memlimiter.MustRefuse() { + // TODO: + // https://github.com/open-telemetry/opentelemetry-collector/issues/12463 + p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles) + return td, memorylimiter.ErrDataRefused + } + + // Even if the next consumer returns error record the data as accepted by + // this processor. + p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles) + return td, nil +} diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 85a7fdf4fcb..8b542952c88 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -22,12 +22,14 @@ import ( "go.opentelemetry.io/collector/internal/memorylimiter/iruntime" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadatatest" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" "go.opentelemetry.io/collector/processor/processortest" ) @@ -419,6 +421,96 @@ func TestLogMemoryPressureResponse(t *testing.T) { }) } +// TestProfileMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestProfileMemoryPressureResponse(t *testing.T) { + pd := pprofile.NewProfiles() + ctx := context.Background() + + tests := []struct { + name string + mlCfg *Config + memAlloc uint64 + expectError bool + }{ + { + name: "Below memAllocLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 1, + }, + memAlloc: 800, + expectError: false, + }, + { + name: "Above memAllocLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 1, + }, + memAlloc: 1800, + expectError: true, + }, + { + name: "Below memSpikeLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 10, + }, + memAlloc: 800, + expectError: false, + }, + { + name: "Above memSpikeLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 11, + }, + memAlloc: 800, + expectError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + memorylimiter.GetMemoryFn = totalMemory + memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { + ms.Alloc = tt.memAlloc + } + + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(metadata.Type), tt.mlCfg) + require.NoError(t, err) + tp, err := xprocessorhelper.NewProfiles( + context.Background(), + processortest.NewNopSettings(metadata.Type), + tt.mlCfg, + consumertest.NewNop(), + ml.processProfiles, + xprocessorhelper.WithCapabilities(processorCapabilities), + xprocessorhelper.WithStart(ml.start), + xprocessorhelper.WithShutdown(ml.shutdown)) + require.NoError(t, err) + + assert.NoError(t, tp.Start(ctx, &host{})) + ml.memlimiter.CheckMemLimits() + err = tp.ConsumeProfiles(ctx, pd) + if tt.expectError { + assert.Equal(t, memorylimiter.ErrDataRefused, err) + } else { + require.NoError(t, err) + } + assert.NoError(t, tp.Shutdown(ctx)) + }) + } + t.Cleanup(func() { + memorylimiter.GetMemoryFn = iruntime.TotalMemory + memorylimiter.ReadMemStatsFn = runtime.ReadMemStats + }) +} + type host struct { component.Host } diff --git a/processor/memorylimiterprocessor/metadata.yaml b/processor/memorylimiterprocessor/metadata.yaml index d0fb4dfc6eb..870b22863fa 100644 --- a/processor/memorylimiterprocessor/metadata.yaml +++ b/processor/memorylimiterprocessor/metadata.yaml @@ -4,6 +4,7 @@ github_project: open-telemetry/opentelemetry-collector status: class: processor stability: + alpha: [profiles] beta: [traces, metrics, logs] distributions: [core, contrib, k8s]