Skip to content

Commit

Permalink
[processor/memorylimiter] Add profiles support to memorylimiter proce…
Browse files Browse the repository at this point in the history
…ssor (#12454)

#### Description
Add profiles support to the memorylimiter processor.

#### Link to tracking issue
Fixes #12453

Signed-off-by: Israel Blancas <[email protected]>
  • Loading branch information
iblancasa authored Feb 27, 2025
1 parent 3c72d35 commit 4397725
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: memorylimiterprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for profiles.

# One or more tracking issues or pull requests related to the change
issues: [12453]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/builder/internal/builder/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var replaceModules = []string{
"/processor/processortest",
"/processor/batchprocessor",
"/processor/memorylimiterprocessor",
"/processor/processorhelper/xprocessorhelper",
"/processor/xprocessor",
"/receiver",
"/receiver/nopreceiver",
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ replaces:
- go.opentelemetry.io/collector/processor/batchprocessor => ../../processor/batchprocessor
- go.opentelemetry.io/collector/processor/memorylimiterprocessor => ../../processor/memorylimiterprocessor
- go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor
- go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../../processor/processorhelper/xprocessorhelper
- go.opentelemetry.io/collector/receiver => ../../receiver
- go.opentelemetry.io/collector/receiver/nopreceiver => ../../receiver/nopreceiver
- go.opentelemetry.io/collector/receiver/otlpreceiver => ../../receiver/otlpreceiver
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ require (
go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processortest v0.120.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect
go.opentelemetry.io/collector/receiver/receivertest v0.120.0 // indirect
Expand Down Expand Up @@ -287,6 +288,8 @@ replace go.opentelemetry.io/collector/processor/memorylimiterprocessor => ../../

replace go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor

replace go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../../processor/processorhelper/xprocessorhelper

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/receiver/nopreceiver => ../../receiver/nopreceiver
Expand Down
4 changes: 3 additions & 1 deletion processor/memorylimiterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, metrics, logs |
| Stability | [alpha]: profiles |
| | [beta]: traces, metrics, logs |
| Distributions | [core], [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fmemorylimiter%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fmemorylimiter%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand Down
37 changes: 32 additions & 5 deletions processor/memorylimiterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"
)

var processorCapabilities = consumer.Capabilities{MutatesData: false}
Expand All @@ -29,16 +32,17 @@ type factory struct {
}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() processor.Factory {
func NewFactory() xprocessor.Factory {
f := &factory{
memoryLimiters: map[component.Config]*memoryLimiterProcessor{},
}
return processor.NewFactory(
return xprocessor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(f.createTraces, metadata.TracesStability),
processor.WithMetrics(f.createMetrics, metadata.MetricsStability),
processor.WithLogs(f.createLogs, metadata.LogsStability))
xprocessor.WithTraces(f.createTraces, metadata.TracesStability),
xprocessor.WithMetrics(f.createMetrics, metadata.MetricsStability),
xprocessor.WithLogs(f.createLogs, metadata.LogsStability),
xprocessor.WithProfiles(f.createProfiles, metadata.ProfilesStability))
}

// CreateDefaultConfig creates the default configuration for processor. Notice
Expand Down Expand Up @@ -98,6 +102,29 @@ func (f *factory) createLogs(
processorhelper.WithShutdown(memLimiter.shutdown))
}

func (f *factory) createProfiles(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer xconsumer.Profiles,
) (xprocessor.Profiles, error) {
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}

return xprocessorhelper.NewProfiles(
ctx,
set,
cfg,
nextConsumer,
memLimiter.processProfiles,
xprocessorhelper.WithCapabilities(processorCapabilities),
xprocessorhelper.WithStart(memLimiter.start),
xprocessorhelper.WithShutdown(memLimiter.shutdown),
)
}

// getMemoryLimiter checks if we have a cached memoryLimiter with a specific config,
// otherwise initialize and add one to the store.
func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) {
Expand Down
8 changes: 7 additions & 1 deletion processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,17 @@ func TestCreateProcessor(t *testing.T) {
lp, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NotNil(t, lp)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))

pp, err := factory.CreateProfiles(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NotNil(t, pp)
assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, lp.Shutdown(context.Background()))
assert.NoError(t, tp.Shutdown(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
assert.NoError(t, pp.Shutdown(context.Background()))
// verify that no monitoring routine is running
require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted)

Expand Down
12 changes: 9 additions & 3 deletions processor/memorylimiterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ require (
go.opentelemetry.io/collector/consumer v1.26.0
go.opentelemetry.io/collector/consumer/consumererror v0.120.0
go.opentelemetry.io/collector/consumer/consumertest v0.120.0
go.opentelemetry.io/collector/consumer/xconsumer v0.120.0
go.opentelemetry.io/collector/internal/memorylimiter v0.120.0
go.opentelemetry.io/collector/internal/telemetry v0.120.0
go.opentelemetry.io/collector/pdata v1.26.0
go.opentelemetry.io/collector/pdata/pprofile v0.120.0
go.opentelemetry.io/collector/pipeline v0.120.0
go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0
go.opentelemetry.io/collector/processor v0.120.0
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0
go.opentelemetry.io/collector/processor/processortest v0.120.0
go.opentelemetry.io/collector/processor/xprocessor v0.120.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
Expand Down Expand Up @@ -50,10 +55,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.120.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.120.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.33.0 // indirect
Expand Down Expand Up @@ -103,3 +105,7 @@ replace go.opentelemetry.io/collector/internal/memorylimiter => ../../internal/m
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry

replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror

replace go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../processorhelper/xprocessorhelper

replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 23 additions & 15 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/processor"
)

Expand Down Expand Up @@ -50,11 +52,8 @@ func (p *memoryLimiterProcessor) shutdown(ctx context.Context) error {
func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
numSpans := td.SpanCount()
if p.memlimiter.MustRefuse() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack and that the receiver will correctly retry the refused data again.
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numSpans, pipeline.SignalTraces)
return td, memorylimiter.ErrDataRefused
}
Expand All @@ -68,11 +67,8 @@ func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Tr
func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
numDataPoints := md.DataPointCount()
if p.memlimiter.MustRefuse() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numDataPoints, pipeline.SignalMetrics)
return md, memorylimiter.ErrDataRefused
}
Expand All @@ -86,11 +82,8 @@ func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric.
func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
numRecords := ld.LogRecordCount()
if p.memlimiter.MustRefuse() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numRecords, pipeline.SignalLogs)
return ld, memorylimiter.ErrDataRefused
}
Expand All @@ -100,3 +93,18 @@ func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs)
p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs)
return ld, nil
}

func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) {
numProfiles := td.SampleCount()
if p.memlimiter.MustRefuse() {
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles)
return td, memorylimiter.ErrDataRefused
}

// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles)
return td, nil
}
92 changes: 92 additions & 0 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadatatest"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/processortest"
)

Expand Down Expand Up @@ -419,6 +421,96 @@ func TestLogMemoryPressureResponse(t *testing.T) {
})
}

// TestProfileMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestProfileMemoryPressureResponse(t *testing.T) {
pd := pprofile.NewProfiles()
ctx := context.Background()

tests := []struct {
name string
mlCfg *Config
memAlloc uint64
expectError bool
}{
{
name: "Below memAllocLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
memAlloc: 800,
expectError: false,
},
{
name: "Above memAllocLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
memAlloc: 1800,
expectError: true,
},
{
name: "Below memSpikeLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 10,
},
memAlloc: 800,
expectError: false,
},
{
name: "Above memSpikeLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 11,
},
memAlloc: 800,
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc
}

ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(metadata.Type), tt.mlCfg)
require.NoError(t, err)
tp, err := xprocessorhelper.NewProfiles(
context.Background(),
processortest.NewNopSettings(metadata.Type),
tt.mlCfg,
consumertest.NewNop(),
ml.processProfiles,
xprocessorhelper.WithCapabilities(processorCapabilities),
xprocessorhelper.WithStart(ml.start),
xprocessorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)

assert.NoError(t, tp.Start(ctx, &host{}))
ml.memlimiter.CheckMemLimits()
err = tp.ConsumeProfiles(ctx, pd)
if tt.expectError {
assert.Equal(t, memorylimiter.ErrDataRefused, err)
} else {
require.NoError(t, err)
}
assert.NoError(t, tp.Shutdown(ctx))
})
}
t.Cleanup(func() {
memorylimiter.GetMemoryFn = iruntime.TotalMemory
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
})
}

type host struct {
component.Host
}
Expand Down
Loading

0 comments on commit 4397725

Please sign in to comment.