diff --git a/.chloggen/mx-psi_type-mismatch.yaml b/.chloggen/mx-psi_type-mismatch.yaml new file mode 100644 index 00000000000..117603187c2 --- /dev/null +++ b/.chloggen/mx-psi_type-mismatch.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor, connector, exporter, receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Explicitly error out at component creation time if there is a type mismatch. + +# One or more tracking issues or pull requests related to the change +issues: [12305] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/connector/connector.go b/connector/connector.go index ae930f30626..9c9f113b64a 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -305,6 +305,11 @@ func (f *factory) CreateTracesToTraces(ctx context.Context, set Settings, cfg co if f.createTracesToTracesFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, pipeline.SignalTraces) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesToTracesFunc(ctx, set, cfg, next) } @@ -312,6 +317,11 @@ func (f *factory) CreateTracesToMetrics(ctx context.Context, set Settings, cfg c if f.createTracesToMetricsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, pipeline.SignalMetrics) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesToMetricsFunc(ctx, set, cfg, next) } @@ -319,6 +329,11 @@ func (f *factory) CreateTracesToLogs(ctx context.Context, set Settings, cfg comp if f.createTracesToLogsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, pipeline.SignalLogs) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesToLogsFunc(ctx, set, cfg, next) } @@ -326,6 +341,11 @@ func (f *factory) CreateMetricsToTraces(ctx context.Context, set Settings, cfg c if f.createMetricsToTracesFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, pipeline.SignalTraces) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsToTracesFunc(ctx, set, cfg, next) } @@ -333,6 +353,11 @@ func (f *factory) CreateMetricsToMetrics(ctx context.Context, set Settings, cfg if f.createMetricsToMetricsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, pipeline.SignalMetrics) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsToMetricsFunc(ctx, set, cfg, next) } @@ -340,6 +365,11 @@ func (f *factory) CreateMetricsToLogs(ctx context.Context, set Settings, cfg com if f.createMetricsToLogsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, pipeline.SignalLogs) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsToLogsFunc(ctx, set, cfg, next) } @@ -347,6 +377,11 @@ func (f *factory) CreateLogsToTraces(ctx context.Context, set Settings, cfg comp if f.createLogsToTracesFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, pipeline.SignalTraces) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsToTracesFunc(ctx, set, cfg, next) } @@ -354,6 +389,11 @@ func (f *factory) CreateLogsToMetrics(ctx context.Context, set Settings, cfg com if f.createLogsToMetricsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, pipeline.SignalMetrics) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsToMetricsFunc(ctx, set, cfg, next) } @@ -361,6 +401,11 @@ func (f *factory) CreateLogsToLogs(ctx context.Context, set Settings, cfg compon if f.createLogsToLogsFunc == nil { return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, pipeline.SignalLogs) } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsToLogsFunc(ctx, set, cfg, next) } diff --git a/connector/connector_test.go b/connector/connector_test.go index 28b59dc78bd..1853c9295f8 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -19,7 +19,7 @@ import ( var ( testType = component.MustNewType("test") - testID = component.MustNewIDWithName("type", "name") + testID = component.MustNewIDWithName("test", "name") ) func TestNewFactoryNoOptions(t *testing.T) { @@ -58,18 +58,26 @@ func TestNewFactoryWithSameTypes(t *testing.T) { WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained)) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) + wrongID := component.MustNewID("wrong") + wrongIDErrStr := internal.ErrIDMismatch(wrongID, testType).Error() assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToTracesStability()) _, err := factory.CreateTracesToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) + _, err = factory.CreateTracesToTraces(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.ErrorContains(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToMetricsStability()) _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) + _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.ErrorContains(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToLogsStability()) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) + _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.ErrorContains(t, err, wrongIDErrStr) _, err = factory.CreateTracesToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalTraces, pipeline.SignalMetrics)) @@ -147,33 +155,33 @@ func TestNewFactoryWithAllTypes(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToTracesStability()) - _, err := factory.CreateTracesToTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err := factory.CreateTracesToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelDevelopment, factory.TracesToMetricsStability()) - _, err = factory.CreateTracesToMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateTracesToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToLogsStability()) - _, err = factory.CreateTracesToLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateTracesToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToTracesStability()) - _, err = factory.CreateMetricsToTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateMetricsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToMetricsStability()) - _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelStable, factory.MetricsToLogsStability()) - _, err = factory.CreateMetricsToLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateMetricsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelDeprecated, factory.LogsToTracesStability()) - _, err = factory.CreateLogsToTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateLogsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToMetricsStability()) - _, err = factory.CreateLogsToMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToLogsStability()) - _, err = factory.CreateLogsToLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } diff --git a/connector/internal/factory.go b/connector/internal/factory.go index b865f6311cb..764ba43fdee 100644 --- a/connector/internal/factory.go +++ b/connector/internal/factory.go @@ -13,3 +13,7 @@ import ( func ErrDataTypes(id component.ID, from, to pipeline.Signal) error { return fmt.Errorf("connector %q cannot connect from %s to %s: %w", id, from, to, pipeline.ErrSignalNotSupported) } + +func ErrIDMismatch(id component.ID, typ component.Type) error { + return fmt.Errorf("component type mismatch: component ID %q does not have type %q", id, typ) +} diff --git a/exporter/exporter.go b/exporter/exporter.go index c66e27397c9..6542d9ef802 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/pipeline" ) @@ -132,6 +133,10 @@ func (f *factory) CreateTraces(ctx context.Context, set Settings, cfg component. return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, experr.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesFunc(ctx, set, cfg) } @@ -140,6 +145,10 @@ func (f *factory) CreateMetrics(ctx context.Context, set Settings, cfg component return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, experr.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsFunc(ctx, set, cfg) } @@ -148,6 +157,10 @@ func (f *factory) CreateLogs(ctx context.Context, set Settings, cfg component.Co return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, experr.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsFunc(ctx, set, cfg) } diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index 79a9db0ef67..4fbf0c9c990 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -12,27 +12,31 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/pipeline" ) +var ( + testType = component.MustNewType("test") + testID = component.NewID(testType) +) + func TestNewFactory(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) } func TestNewFactoryWithOptions(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, @@ -43,17 +47,26 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) + wrongID := component.MustNewID("wrong") + wrongIDErrStr := experr.ErrIDMismatch(wrongID, testType).Error() + assert.Equal(t, component.StabilityLevelDevelopment, f.TracesStability()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg) require.NoError(t, err) + _, err = f.CreateTraces(context.Background(), Settings{ID: wrongID}, &defaultCfg) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability()) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg) require.NoError(t, err) + _, err = f.CreateMetrics(context.Background(), Settings{ID: wrongID}, &defaultCfg) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelDeprecated, f.LogsStability()) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) - assert.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg) + require.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: wrongID}, &defaultCfg) + require.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nop{ diff --git a/exporter/exportertest/contract_checker.go b/exporter/exportertest/contract_checker.go index 7a3a61ff5f4..bc0eb9324b2 100644 --- a/exporter/exportertest/contract_checker.go +++ b/exporter/exportertest/contract_checker.go @@ -151,7 +151,7 @@ func checkTraces(t *testing.T, params CheckConsumeContractParams, mockReceiver c ctx := context.Background() var exp exporter.Traces var err error - exp, err = params.ExporterFactory.CreateTraces(ctx, NewNopSettings(NopType), params.ExporterConfig) + exp, err = params.ExporterFactory.CreateTraces(ctx, NewNopSettings(params.ExporterFactory.Type()), params.ExporterConfig) require.NoError(t, err) require.NotNil(t, exp) @@ -191,7 +191,7 @@ func checkLogs(t *testing.T, params CheckConsumeContractParams, mockReceiver com ctx := context.Background() var exp exporter.Logs var err error - exp, err = params.ExporterFactory.CreateLogs(ctx, NewNopSettings(NopType), params.ExporterConfig) + exp, err = params.ExporterFactory.CreateLogs(ctx, NewNopSettings(params.ExporterFactory.Type()), params.ExporterConfig) require.NoError(t, err) require.NotNil(t, exp) diff --git a/exporter/internal/experr/err.go b/exporter/internal/experr/err.go index 6bff64b162d..f8b109d7de7 100644 --- a/exporter/internal/experr/err.go +++ b/exporter/internal/experr/err.go @@ -3,7 +3,12 @@ package experr // import "go.opentelemetry.io/collector/exporter/internal/experr" -import "errors" +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" +) type shutdownErr struct { err error @@ -25,3 +30,7 @@ func IsShutdownErr(err error) bool { var sdErr shutdownErr return errors.As(err, &sdErr) } + +func ErrIDMismatch(id component.ID, typ component.Type) error { + return fmt.Errorf("component type mismatch: component ID %q does not have type %q", id, typ) +} diff --git a/exporter/xexporter/exporter.go b/exporter/xexporter/exporter.go index 8add9a9aba8..a3010008041 100644 --- a/exporter/xexporter/exporter.go +++ b/exporter/xexporter/exporter.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/internal/experr" "go.opentelemetry.io/collector/pipeline" ) @@ -95,6 +96,9 @@ func (f *factory) CreateProfiles(ctx context.Context, set exporter.Settings, cfg return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, experr.ErrIDMismatch(set.ID, f.Type()) + } return f.createProfilesFunc(ctx, set, cfg) } diff --git a/exporter/xexporter/exporter_test.go b/exporter/xexporter/exporter_test.go index d1929c4c22e..1ac8d0886db 100644 --- a/exporter/xexporter/exporter_test.go +++ b/exporter/xexporter/exporter_test.go @@ -8,12 +8,16 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/internal/experr" ) +var testID = component.MustNewID("test") + func TestNewFactoryWithProfiles(t *testing.T) { testType := component.MustNewType("test") defaultCfg := struct{}{} @@ -26,8 +30,13 @@ func TestNewFactoryWithProfiles(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelDevelopment, factory.ProfilesStability()) - _, err := factory.CreateProfiles(context.Background(), exporter.Settings{}, &defaultCfg) - assert.NoError(t, err) + _, err := factory.CreateProfiles(context.Background(), exporter.Settings{ID: testID}, &defaultCfg) + require.NoError(t, err) + + wrongID := component.MustNewID("wrong") + wrongIDErrStr := experr.ErrIDMismatch(wrongID, testType).Error() + _, err = factory.CreateProfiles(context.Background(), exporter.Settings{ID: wrongID}, &defaultCfg) + assert.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nop{ diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index d1321c1a7a5..61af21f10ec 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadatatest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -35,7 +36,7 @@ func TestProcessorShutdown(t *testing.T) { factory := NewFactory() ctx := context.Background() - processorCreationSet := processortest.NewNopSettings(processortest.NopType) + processorCreationSet := processortest.NewNopSettings(metadata.Type) for i := 0; i < 5; i++ { require.NotPanics(t, func() { @@ -62,7 +63,7 @@ func TestProcessorLifecycle(t *testing.T) { factory := NewFactory() ctx := context.Background() - processorCreationSet := processortest.NewNopSettings(processortest.NopType) + processorCreationSet := processortest.NewNopSettings(metadata.Type) for i := 0; i < 5; i++ { tProc, err := factory.CreateTraces(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) @@ -86,7 +87,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -127,7 +128,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -368,7 +369,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -413,7 +414,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { } sink := new(consumertest.TracesSink) - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -442,7 +443,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metricsPerRequest := 5 sink := new(consumertest.MetricsSink) - metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) @@ -603,7 +604,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) @@ -650,7 +651,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) @@ -746,7 +747,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { func runMetricsProcessorBenchmark(b *testing.B, cfg *Config) { ctx := context.Background() sink := new(metricsSink) - metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(b, err) require.NoError(b, metrics.Start(ctx, componenttest.NewNopHost())) @@ -792,7 +793,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) @@ -931,7 +932,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) @@ -978,7 +979,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) @@ -1055,7 +1056,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.SendBatchSize = 1000 cfg.Timeout = 10 * time.Minute cfg.MetadataKeys = []string{"token1", "token2"} - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -1146,7 +1147,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit - traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) @@ -1187,7 +1188,7 @@ func TestBatchZeroConfig(t *testing.T) { const requestCount = 5 const logsPerRequest = 10 sink := new(consumertest.LogsSink) - logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) defer func() { require.NoError(t, logs.Shutdown(context.Background())) }() @@ -1226,7 +1227,7 @@ func TestBatchSplitOnly(t *testing.T) { require.NoError(t, cfg.Validate()) sink := new(consumertest.LogsSink) - logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(processortest.NopType), cfg, sink) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(metadata.Type), cfg, sink) require.NoError(t, err) require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) defer func() { require.NoError(t, logs.Shutdown(context.Background())) }() diff --git a/processor/internal/err.go b/processor/internal/err.go new file mode 100644 index 00000000000..aa412bba43c --- /dev/null +++ b/processor/internal/err.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import ( + "fmt" + + "go.opentelemetry.io/collector/component" +) + +func ErrIDMismatch(id component.ID, typ component.Type) error { + return fmt.Errorf("component type mismatch: component ID %q does not have type %q", id, typ) +} diff --git a/processor/processor.go b/processor/processor.go index 06d2239ec58..c54154fe9f2 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor/internal" ) // Traces is a processor that can consume traces. @@ -125,6 +126,11 @@ func (f *factory) CreateTraces(ctx context.Context, set Settings, cfg component. if f.createTracesFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesFunc(ctx, set, cfg, next) } @@ -132,6 +138,11 @@ func (f *factory) CreateMetrics(ctx context.Context, set Settings, cfg component if f.createMetricsFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsFunc(ctx, set, cfg, next) } @@ -139,6 +150,11 @@ func (f *factory) CreateLogs(ctx context.Context, set Settings, cfg component.Co if f.createLogsFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsFunc(ctx, set, cfg, next) } diff --git a/processor/processor_test.go b/processor/processor_test.go index 3d3ea5dafe8..14bd5e6a2e5 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -14,26 +14,30 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor/internal" +) + +var ( + testType = component.MustNewType("test") + testID = component.NewID(testType) ) func TestNewFactory(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) } func TestNewFactoryWithOptions(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, @@ -44,17 +48,26 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) + wrongID := component.MustNewID("wrong") + wrongIDErrStr := internal.ErrIDMismatch(wrongID, testType).Error() + assert.Equal(t, component.StabilityLevelAlpha, f.TracesStability()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) + _, err = f.CreateTraces(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelBeta, f.MetricsStability()) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) + _, err = f.CreateMetrics(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelUnmaintained, f.LogsStability()) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) - assert.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + require.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nopProcessor{ diff --git a/processor/processortest/shutdown_verifier.go b/processor/processortest/shutdown_verifier.go index c579ecc3cac..dde956de35e 100644 --- a/processor/processortest/shutdown_verifier.go +++ b/processor/processortest/shutdown_verifier.go @@ -22,7 +22,7 @@ import ( func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.TracesSink) - proc, err := factory.CreateTraces(context.Background(), NewNopSettings(NopType), cfg, nextSink) + proc, err := factory.CreateTraces(context.Background(), NewNopSettings(factory.Type()), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -46,7 +46,7 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.LogsSink) - proc, err := factory.CreateLogs(context.Background(), NewNopSettings(NopType), cfg, nextSink) + proc, err := factory.CreateLogs(context.Background(), NewNopSettings(factory.Type()), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -70,7 +70,7 @@ func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Facto func verifyMetricsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.MetricsSink) - proc, err := factory.CreateMetrics(context.Background(), NewNopSettings(NopType), cfg, nextSink) + proc, err := factory.CreateMetrics(context.Background(), NewNopSettings(factory.Type()), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } diff --git a/processor/processortest/unhealthy_processor_test.go b/processor/processortest/unhealthy_processor_test.go index 4398d55bae2..828373af268 100644 --- a/processor/processortest/unhealthy_processor_test.go +++ b/processor/processortest/unhealthy_processor_test.go @@ -26,21 +26,21 @@ func TestNewUnhealthyProcessorFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &struct{}{}, cfg) - traces, err := factory.CreateTraces(context.Background(), NewNopSettings(NopType), cfg, consumertest.NewNop()) + traces, err := factory.CreateTraces(context.Background(), NewNopSettings(factory.Type()), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetrics(context.Background(), NewNopSettings(NopType), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetrics(context.Background(), NewNopSettings(factory.Type()), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogs(context.Background(), NewNopSettings(NopType), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogs(context.Background(), NewNopSettings(factory.Type()), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/xprocessor/processor.go b/processor/xprocessor/processor.go index 0b0dff3e98c..92262e53a13 100644 --- a/processor/xprocessor/processor.go +++ b/processor/xprocessor/processor.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/internal" ) // Factory is a component.Factory interface for processors. @@ -65,6 +66,9 @@ func (f factory) CreateProfiles(ctx context.Context, set processor.Settings, cfg if f.createProfilesFunc == nil { return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } return f.createProfilesFunc(ctx, set, cfg, next) } diff --git a/processor/xprocessor/processor_test.go b/processor/xprocessor/processor_test.go index 79c42578336..ee95f9f9275 100644 --- a/processor/xprocessor/processor_test.go +++ b/processor/xprocessor/processor_test.go @@ -8,13 +8,17 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/internal" ) +var testID = component.MustNewID("test") + func TestNewFactoryWithProfiles(t *testing.T) { testType := component.MustNewType("test") defaultCfg := struct{}{} @@ -27,8 +31,13 @@ func TestNewFactoryWithProfiles(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesStability()) - _, err := factory.CreateProfiles(context.Background(), processor.Settings{}, &defaultCfg, consumertest.NewNop()) - assert.NoError(t, err) + _, err := factory.CreateProfiles(context.Background(), processor.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + wrongID := component.MustNewID("wrong") + wrongIDErrStr := internal.ErrIDMismatch(wrongID, testType).Error() + _, err = factory.CreateProfiles(context.Background(), processor.Settings{ID: wrongID}, &defaultCfg, consumertest.NewNop()) + assert.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nopProcessor{ diff --git a/receiver/internal/err.go b/receiver/internal/err.go new file mode 100644 index 00000000000..7969e79afcc --- /dev/null +++ b/receiver/internal/err.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/receiver/internal" + +import ( + "fmt" + + "go.opentelemetry.io/collector/component" +) + +func ErrIDMismatch(id component.ID, typ component.Type) error { + return fmt.Errorf("component type mismatch: component ID %q does not have type %q", id, typ) +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 9a58c8d5405..b3426f83769 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -47,6 +47,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -703,7 +704,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { r, err := NewFactory().CreateTraces( context.Background(), - receivertest.NewNopSettings(receivertest.NopType), + receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop()) require.NoError(t, err) @@ -772,7 +773,7 @@ func TestHTTPInvalidTLSCredentials(t *testing.T) { // TLS is resolved during Start for HTTP. r, err := NewFactory().CreateTraces( context.Background(), - receivertest.NewNopSettings(receivertest.NopType), + receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop()) require.NoError(t, err) @@ -838,7 +839,7 @@ func newHTTPReceiver(t *testing.T, settings component.TelemetrySettings, endpoin } func newReceiver(t *testing.T, settings component.TelemetrySettings, cfg *Config, id component.ID, c consumertest.Consumer) component.Component { - set := receivertest.NewNopSettings(receivertest.NopType) + set := receivertest.NewNopSettings(metadata.Type) set.TelemetrySettings = settings set.ID = id r, err := newOtlpReceiver(cfg, &set) @@ -1015,7 +1016,7 @@ func TestShutdown(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPC.NetAddr.Endpoint = endpointGrpc cfg.HTTP.Endpoint = endpointHTTP - set := receivertest.NewNopSettings(receivertest.NopType) + set := receivertest.NewNopSettings(metadata.Type) set.ID = otlpReceiverID r, err := NewFactory().CreateTraces( context.Background(), diff --git a/receiver/receiver.go b/receiver/receiver.go index b102b5070c6..a90a4917430 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/receiver/internal" ) // Traces receiver receives traces. @@ -141,6 +142,11 @@ func (f *factory) CreateTraces(ctx context.Context, set Settings, cfg component. if f.createTracesFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createTracesFunc(ctx, set, cfg, next) } @@ -148,6 +154,11 @@ func (f *factory) CreateMetrics(ctx context.Context, set Settings, cfg component if f.createMetricsFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createMetricsFunc(ctx, set, cfg, next) } @@ -155,6 +166,11 @@ func (f *factory) CreateLogs(ctx context.Context, set Settings, cfg component.Co if f.createLogsFunc == nil { return nil, pipeline.ErrSignalNotSupported } + + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } + return f.createLogsFunc(ctx, set, cfg, next) } diff --git a/receiver/receiver_test.go b/receiver/receiver_test.go index 84e6c44822b..291ff3e8169 100644 --- a/receiver/receiver_test.go +++ b/receiver/receiver_test.go @@ -14,26 +14,30 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/receiver/internal" +) + +var ( + testType = component.MustNewType("test") + testID = component.NewID(testType) ) func TestNewFactory(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) } func TestNewFactoryWithOptions(t *testing.T) { - testType := component.MustNewType("test") defaultCfg := struct{}{} f := NewFactory( testType, @@ -44,17 +48,26 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) + wrongID := component.MustNewID("wrong") + wrongIDErrStr := internal.ErrIDMismatch(wrongID, testType).Error() + assert.Equal(t, component.StabilityLevelDeprecated, f.TracesStability()) - _, err := f.CreateTraces(context.Background(), Settings{}, &defaultCfg, nil) + _, err := f.CreateTraces(context.Background(), Settings{ID: testID}, &defaultCfg, nil) require.NoError(t, err) + _, err = f.CreateTraces(context.Background(), Settings{ID: wrongID}, &defaultCfg, nil) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability()) - _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg, nil) + _, err = f.CreateMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, nil) require.NoError(t, err) + _, err = f.CreateMetrics(context.Background(), Settings{ID: wrongID}, &defaultCfg, nil) + require.EqualError(t, err, wrongIDErrStr) assert.Equal(t, component.StabilityLevelStable, f.LogsStability()) - _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, nil) - assert.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: testID}, &defaultCfg, nil) + require.NoError(t, err) + _, err = f.CreateLogs(context.Background(), Settings{ID: wrongID}, &defaultCfg, nil) + require.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nopReceiver{ diff --git a/receiver/receivertest/contract_checker.go b/receiver/receivertest/contract_checker.go index e356093243c..5e08b6a701e 100644 --- a/receiver/receivertest/contract_checker.go +++ b/receiver/receivertest/contract_checker.go @@ -113,11 +113,11 @@ func checkConsumeContractScenario(params CheckConsumeContractParams, decisionFun var err error switch params.Signal { case pipeline.SignalLogs: - receiver, err = params.Factory.CreateLogs(ctx, NewNopSettings(NopType), params.Config, consumer) + receiver, err = params.Factory.CreateLogs(ctx, NewNopSettings(params.Factory.Type()), params.Config, consumer) case pipeline.SignalTraces: - receiver, err = params.Factory.CreateTraces(ctx, NewNopSettings(NopType), params.Config, consumer) + receiver, err = params.Factory.CreateTraces(ctx, NewNopSettings(params.Factory.Type()), params.Config, consumer) case pipeline.SignalMetrics: - receiver, err = params.Factory.CreateMetrics(ctx, NewNopSettings(NopType), params.Config, consumer) + receiver, err = params.Factory.CreateMetrics(ctx, NewNopSettings(params.Factory.Type()), params.Config, consumer) default: require.FailNow(params.T, "must specify a valid DataType to test for") } diff --git a/receiver/xreceiver/profiles.go b/receiver/xreceiver/profiles.go index 6aa5b3d0614..cc068175d90 100644 --- a/receiver/xreceiver/profiles.go +++ b/receiver/xreceiver/profiles.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/internal" ) // Profiles receiver receives profiles. @@ -67,6 +68,9 @@ func (f *factory) CreateProfiles(ctx context.Context, set receiver.Settings, cfg if f.createProfilesFunc == nil { return nil, pipeline.ErrSignalNotSupported } + if set.ID.Type() != f.Type() { + return nil, internal.ErrIDMismatch(set.ID, f.Type()) + } return f.createProfilesFunc(ctx, set, cfg, next) } diff --git a/receiver/xreceiver/receiver_test.go b/receiver/xreceiver/receiver_test.go index 06ff1f592f9..e649f257190 100644 --- a/receiver/xreceiver/receiver_test.go +++ b/receiver/xreceiver/receiver_test.go @@ -8,13 +8,17 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/internal" ) +var testID = component.MustNewID("test") + func TestNewFactoryWithProfiles(t *testing.T) { testType := component.MustNewType("test") defaultCfg := struct{}{} @@ -27,8 +31,12 @@ func TestNewFactoryWithProfiles(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesStability()) - _, err := factory.CreateProfiles(context.Background(), receiver.Settings{}, &defaultCfg, nil) - assert.NoError(t, err) + _, err := factory.CreateProfiles(context.Background(), receiver.Settings{ID: testID}, &defaultCfg, nil) + require.NoError(t, err) + wrongID := component.MustNewID("wrong") + wrongIDErrStr := internal.ErrIDMismatch(wrongID, testType).Error() + _, err = factory.CreateProfiles(context.Background(), receiver.Settings{ID: wrongID}, &defaultCfg, nil) + assert.EqualError(t, err, wrongIDErrStr) } var nopInstance = &nopReceiver{