From da55bc444f1a8170b6f17f62220a2ed520aa5217 Mon Sep 17 00:00:00 2001 From: Israel Blancas Date: Fri, 21 Feb 2025 12:05:16 +0100 Subject: [PATCH] Add profiles support to memorylimiter processor. Signed-off-by: Israel Blancas --- ...ncasa-memorylimiterprocessor-profiles.yaml | 26 ++++++ cmd/otelcorecol/go.mod | 1 + cmd/otelcorecol/go.sum | 2 + processor/memorylimiterprocessor/README.md | 4 +- processor/memorylimiterprocessor/factory.go | 37 +++++++- .../memorylimiterprocessor/factory_test.go | 6 ++ processor/memorylimiterprocessor/go.mod | 8 +- processor/memorylimiterprocessor/go.sum | 4 + .../internal/metadata/generated_status.go | 7 +- .../memorylimiterprocessor/memorylimiter.go | 20 ++++ .../memorylimiter_test.go | 92 +++++++++++++++++++ .../memorylimiterprocessor/metadata.yaml | 1 + 12 files changed, 196 insertions(+), 12 deletions(-) create mode 100644 .chloggen/iblancasa-memorylimiterprocessor-profiles.yaml diff --git a/.chloggen/iblancasa-memorylimiterprocessor-profiles.yaml b/.chloggen/iblancasa-memorylimiterprocessor-profiles.yaml new file mode 100644 index 000000000000..156af71e074f --- /dev/null +++ b/.chloggen/iblancasa-memorylimiterprocessor-profiles.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: memorylimiterprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Add support for profiles to the memorylimiterprocessor. + +# One or more tracking issues or pull requests related to the change +issues: [12453] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index cfd7723f36a0..798fd5721167 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -118,6 +118,7 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect go.opentelemetry.io/collector/pipeline v0.120.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 // indirect + go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.120.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.120.0 // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index 2f380287b983..5b08fe7b288b 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -113,6 +113,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 h1:yO9QPJoYrHM62M1nUZRaAltvTZFQIYpl2V1EfBtOp3E= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0/go.mod h1:ALMYNJNnGvGEiHrbLHzg4xV3Cjs2Y/V2C2capETsRmY= go.opentelemetry.io/contrib/bridges/otelzap v0.9.0 h1:f+xpAfhQTjR8beiSMe1bnT/25PkeyWmOcI+SjXWguNw= go.opentelemetry.io/contrib/bridges/otelzap v0.9.0/go.mod h1:T1Z1jyS5FttgQoF6UcGhnM+gF9wU32B4lHO69nXw4FE= go.opentelemetry.io/contrib/config v0.14.0 h1:QAG8uHNp5ZiCkpT7XggSmg5AyW1sA0LgypMoXgBB1+4= diff --git a/processor/memorylimiterprocessor/README.md b/processor/memorylimiterprocessor/README.md index 661d86508c1a..492f0e1d73b9 100644 --- a/processor/memorylimiterprocessor/README.md +++ b/processor/memorylimiterprocessor/README.md @@ -3,10 +3,12 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: traces, metrics, logs | +| Stability | [alpha]: profiles | +| | [beta]: traces, metrics, logs | | Distributions | [core], [contrib], [k8s] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fmemorylimiter%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fmemorylimiter%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) | +[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha [beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta [core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/processor/memorylimiterprocessor/factory.go b/processor/memorylimiterprocessor/factory.go index d65246c3940d..78a2a1187a83 100644 --- a/processor/memorylimiterprocessor/factory.go +++ b/processor/memorylimiterprocessor/factory.go @@ -11,11 +11,14 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/telemetry/componentattribute" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" + "go.opentelemetry.io/collector/processor/xprocessor" ) var processorCapabilities = consumer.Capabilities{MutatesData: false} @@ -28,16 +31,17 @@ type factory struct { } // NewFactory returns a new factory for the Memory Limiter processor. -func NewFactory() processor.Factory { +func NewFactory() xprocessor.Factory { f := &factory{ memoryLimiters: map[component.Config]*memoryLimiterProcessor{}, } - return processor.NewFactory( + return xprocessor.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(f.createTraces, metadata.TracesStability), - processor.WithMetrics(f.createMetrics, metadata.MetricsStability), - processor.WithLogs(f.createLogs, metadata.LogsStability)) + xprocessor.WithTraces(f.createTraces, metadata.TracesStability), + xprocessor.WithMetrics(f.createMetrics, metadata.MetricsStability), + xprocessor.WithLogs(f.createLogs, metadata.LogsStability), + xprocessor.WithProfiles(f.createProfiles, metadata.ProfilesStability)) } // CreateDefaultConfig creates the default configuration for processor. Notice @@ -97,6 +101,29 @@ func (f *factory) createLogs( processorhelper.WithShutdown(memLimiter.shutdown)) } +func (f *factory) createProfiles( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xprocessor.Profiles, error) { + memLimiter, err := f.getMemoryLimiter(set, cfg) + if err != nil { + return nil, err + } + + return xprocessorhelper.NewProfiles( + ctx, + set, + cfg, + nextConsumer, + memLimiter.processProfiles, + xprocessorhelper.WithCapabilities(processorCapabilities), + xprocessorhelper.WithStart(memLimiter.start), + xprocessorhelper.WithShutdown(memLimiter.shutdown), + ) +} + // getMemoryLimiter checks if we have a cached memoryLimiter with a specific config, // otherwise initialize and add one to the store. func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) { diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index 921fd5dd2eac..24e691966997 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -70,9 +70,15 @@ func TestCreateProcessor(t *testing.T) { assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + pp, err := factory.CreateProfiles(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NotNil(t, pp) + assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, lp.Shutdown(context.Background())) assert.NoError(t, tp.Shutdown(context.Background())) assert.NoError(t, mp.Shutdown(context.Background())) + assert.NoError(t, pp.Shutdown(context.Background())) // verify that no monitoring routine is running require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index 1dafa679120d..dc1d712f0ed1 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -10,12 +10,17 @@ require ( go.opentelemetry.io/collector/consumer v1.26.0 go.opentelemetry.io/collector/consumer/consumererror v0.120.0 go.opentelemetry.io/collector/consumer/consumertest v0.120.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 go.opentelemetry.io/collector/internal/memorylimiter v0.120.0 go.opentelemetry.io/collector/internal/telemetry v0.120.0 go.opentelemetry.io/collector/pdata v1.26.0 + go.opentelemetry.io/collector/pdata/pprofile v0.120.0 go.opentelemetry.io/collector/pipeline v0.120.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 go.opentelemetry.io/collector/processor v0.120.0 + go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 go.opentelemetry.io/collector/processor/processortest v0.120.0 + go.opentelemetry.io/collector/processor/xprocessor v0.120.0 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/metric v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 @@ -50,10 +55,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.120.0 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.120.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect - go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.33.0 // indirect diff --git a/processor/memorylimiterprocessor/go.sum b/processor/memorylimiterprocessor/go.sum index dab393654bc7..6942e6a996d6 100644 --- a/processor/memorylimiterprocessor/go.sum +++ b/processor/memorylimiterprocessor/go.sum @@ -69,6 +69,10 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 h1:klY22BaRMO1+JmjUu0Af961hpHA5qnOTAVR7tN+UTW8= +go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0/go.mod h1:K/7Ki7toZQpNV0GF7TbrOEoo8dP3dDXKKSRNnTyEsBE= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 h1:yO9QPJoYrHM62M1nUZRaAltvTZFQIYpl2V1EfBtOp3E= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0/go.mod h1:ALMYNJNnGvGEiHrbLHzg4xV3Cjs2Y/V2C2capETsRmY= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= diff --git a/processor/memorylimiterprocessor/internal/metadata/generated_status.go b/processor/memorylimiterprocessor/internal/metadata/generated_status.go index 0e8416082780..e22c704f519f 100644 --- a/processor/memorylimiterprocessor/internal/metadata/generated_status.go +++ b/processor/memorylimiterprocessor/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - TracesStability = component.StabilityLevelBeta - MetricsStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelAlpha + TracesStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta ) diff --git a/processor/memorylimiterprocessor/memorylimiter.go b/processor/memorylimiterprocessor/memorylimiter.go index 45da42e0b327..916bc1e0c6a9 100644 --- a/processor/memorylimiterprocessor/memorylimiter.go +++ b/processor/memorylimiterprocessor/memorylimiter.go @@ -10,8 +10,10 @@ import ( "go.opentelemetry.io/collector/internal/memorylimiter" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" ) @@ -100,3 +102,21 @@ func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs) return ld, nil } + +func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) { + numProfiles := td.SampleCount() + if p.memlimiter.MustRefuse() { + // TODO: actually to be 100% sure that this is "refused" and not "dropped" + // it is necessary to check the pipeline to see if this is directly connected + // to a receiver (ie.: a receiver is on the call stack). For now it + // assumes that the pipeline is properly configured and a receiver is on the + // callstack and that the receiver will correctly retry the refused data again. + p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles) + return td, memorylimiter.ErrDataRefused + } + + // Even if the next consumer returns error record the data as accepted by + // this processor. + p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles) + return td, nil +} diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 518e0c344b7b..9d807e78cdb4 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -22,12 +22,14 @@ import ( "go.opentelemetry.io/collector/internal/memorylimiter/iruntime" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadatatest" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" "go.opentelemetry.io/collector/processor/processortest" ) @@ -419,6 +421,96 @@ func TestLogMemoryPressureResponse(t *testing.T) { }) } +// TestProfileMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestProfileMemoryPressureResponse(t *testing.T) { + pd := pprofile.NewProfiles() + ctx := context.Background() + + tests := []struct { + name string + mlCfg *Config + memAlloc uint64 + expectError bool + }{ + { + name: "Below memAllocLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 1, + }, + memAlloc: 800, + expectError: false, + }, + { + name: "Above memAllocLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 1, + }, + memAlloc: 1800, + expectError: true, + }, + { + name: "Below memSpikeLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 10, + }, + memAlloc: 800, + expectError: false, + }, + { + name: "Above memSpikeLimit", + mlCfg: &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 11, + }, + memAlloc: 800, + expectError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + memorylimiter.GetMemoryFn = totalMemory + memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { + ms.Alloc = tt.memAlloc + } + + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettingsWithType(metadata.Type), tt.mlCfg) + require.NoError(t, err) + tp, err := xprocessorhelper.NewProfiles( + context.Background(), + processortest.NewNopSettingsWithType(metadata.Type), + tt.mlCfg, + consumertest.NewNop(), + ml.processProfiles, + xprocessorhelper.WithCapabilities(processorCapabilities), + xprocessorhelper.WithStart(ml.start), + xprocessorhelper.WithShutdown(ml.shutdown)) + require.NoError(t, err) + + assert.NoError(t, tp.Start(ctx, &host{})) + ml.memlimiter.CheckMemLimits() + err = tp.ConsumeProfiles(ctx, pd) + if tt.expectError { + assert.Equal(t, memorylimiter.ErrDataRefused, err) + } else { + require.NoError(t, err) + } + assert.NoError(t, tp.Shutdown(ctx)) + }) + } + t.Cleanup(func() { + memorylimiter.GetMemoryFn = iruntime.TotalMemory + memorylimiter.ReadMemStatsFn = runtime.ReadMemStats + }) +} + type host struct { component.Host } diff --git a/processor/memorylimiterprocessor/metadata.yaml b/processor/memorylimiterprocessor/metadata.yaml index d0fb4dfc6ebb..870b22863fac 100644 --- a/processor/memorylimiterprocessor/metadata.yaml +++ b/processor/memorylimiterprocessor/metadata.yaml @@ -4,6 +4,7 @@ github_project: open-telemetry/opentelemetry-collector status: class: processor stability: + alpha: [profiles] beta: [traces, metrics, logs] distributions: [core, contrib, k8s]