diff --git a/collector/components/azureeventhubreceiver/Makefile b/collector/components/azureeventhubreceiver/Makefile deleted file mode 100644 index ded7a36..0000000 --- a/collector/components/azureeventhubreceiver/Makefile +++ /dev/null @@ -1 +0,0 @@ -include ../../Makefile.Common diff --git a/collector/components/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go b/collector/components/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go new file mode 100644 index 0000000..82a1f2a --- /dev/null +++ b/collector/components/azureeventhubreceiver/azureresourcelogs_unmarshaler_test.go @@ -0,0 +1,118 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package azureeventhubreceiver + +import ( + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// TestUnmarshalLogs_Body should succeed regardless of body content type +func TestUnmarshalLogs_Body(t *testing.T) { + logger := zap.NewNop() + unmarshaler := newRawLogsUnmarshaler(logger) + + testCases := []struct { + name string + body []byte + expect []byte + }{ + { + name: "empty body", + body: []byte(""), + // note that zero length body sets otlp Body to []byte(nil) not []byte{} + expect: []byte(nil), + }, + { + name: "nil body", + body: []byte(nil), + expect: []byte(nil), + }, + { + name: "invalid json", + body: []byte("{malformed-json"), + expect: []byte("{malformed-json"), + }, + { + name: "valid json", + body: []byte(`{"key": "value"}`), + expect: []byte(`{"key": "value"}`), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := &azeventhubs.ReceivedEventData{ + EventData: azeventhubs.EventData{ + Body: tc.body, + Properties: map[string]interface{}{"someKey": "someValue"}, + }, + } + logs, err := unmarshaler.UnmarshalLogs(event) + require.NoError(t, err, "Expected no error for valid event data but got one") + require.Equal(t, 1, logs.ResourceLogs().Len(), "Expected 1 ResourceLog") + otlpBody := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Bytes().AsRaw() + require.Equal(t, otlpBody, tc.expect) + }) + } +} + +func TestUnmarshalLogs_Attributes(t *testing.T) { + logger := zap.NewNop() + unmarshaler := newRawLogsUnmarshaler(logger) + + testCases := []struct { + name string + properties map[string]any + expect map[string]any + }{ + { + name: "empty properties", + properties: map[string]any{}, + expect: map[string]any(nil), + }, + { + name: "nil properties", + properties: map[string]any(nil), + expect: map[string]any(nil), + }, + { + name: "single property", + properties: map[string]interface{}{"someKey": "someValue"}, + expect: map[string]interface{}{"someKey": "someValue"}, + }, + { + name: "multiple properties", + properties: map[string]interface{}{"someKey": "someValue", "anotherKey": "anotherValue"}, + expect: map[string]interface{}{"someKey": "someValue", "anotherKey": "anotherValue"}, + }, + } + + var et = time.Now() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := &azeventhubs.ReceivedEventData{ + EnqueuedTime: &et, + SystemProperties: map[string]interface{}{"syskey1": "sysval1", "syskey2": "sysval2"}, + EventData: azeventhubs.EventData{ + Body: []byte(""), + Properties: tc.properties, + }, + } + logs, err := unmarshaler.UnmarshalLogs(event) + require.NoError(t, err, "Expected no error for valid event data but got one") + resAttrs := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw() + // this is because + require.Equal(t, len(resAttrs), len(tc.expect), "Expected %d attributes, got %d", len(tc.expect), len(resAttrs)) + if len(resAttrs) == 0 { + return + } + + require.Equal(t, resAttrs, tc.expect, "Not equal:\nexpected: %v\nactual: %v", tc.expect, resAttrs) + }) + } +} diff --git a/collector/components/azureeventhubreceiver/config.go b/collector/components/azureeventhubreceiver/config.go index 7133dd7..9ccb992 100644 --- a/collector/components/azureeventhubreceiver/config.go +++ b/collector/components/azureeventhubreceiver/config.go @@ -25,19 +25,12 @@ var ( ) type Config struct { - Connection string `mapstructure:"connection"` - EventHubName string `mapstructure:"eventhub"` - Partition string `mapstructure:"partition"` - Offset string `mapstructure:"offset"` - // - StorageID *component.ID `mapstructure:"storage"` - StorageConnection string `mapstructure:"storage_connection"` - StorageContainer string `mapstructure:"storage_container"` - // - Format string `mapstructure:"format"` - ConsumerGroup string `mapstructure:"group"` - BatchDelay string `mapstructure:"batch_delay"` - BatchCount int `mapstructure:"batch_count"` + Connection string `mapstructure:"connection"` + Partition string `mapstructure:"partition"` + Offset string `mapstructure:"offset"` + StorageID *component.ID `mapstructure:"storage"` + Format string `mapstructure:"format"` + ConsumerGroup string `mapstructure:"group"` } func isValidFormat(format string) bool { diff --git a/collector/components/azureeventhubreceiver/eventhubhandler.go b/collector/components/azureeventhubreceiver/eventhubhandler.go index 061cebc..f5f8070 100644 --- a/collector/components/azureeventhubreceiver/eventhubhandler.go +++ b/collector/components/azureeventhubreceiver/eventhubhandler.go @@ -67,14 +67,6 @@ func (c *consumerClientWrapperImpl) Close(ctx context.Context) error { return c.consumerClient.Close(ctx) } -// type processorHandler struct { -// processor *azeventhubs.Processor -// dataConsumer dataConsumer -// config *Config -// settings receiver.CreateSettings -// cancel context.CancelFunc -// } - type eventhubHandler struct { processor *azeventhubs.Processor consumerClient consumerClientWrapper @@ -92,9 +84,6 @@ func newEventhubHandler(config *Config, settings receiver.CreateSettings) *event // Check if the configuration is meant for testing. This can be done by checking a specific field or a pattern in the connection string. if strings.Contains(config.Connection, "fake.servicebus.windows.net") { return nil - // Return a mock handler if the connection string is empty or obviously fake. - // return newMockEventhubHandler() - // return newMockEventhubHandler(config, settings) } return &eventhubHandler{ diff --git a/collector/components/azureeventhubreceiver/generated_component_test.go b/collector/components/azureeventhubreceiver/generated_component_test.go index b0f794e..423c718 100644 --- a/collector/components/azureeventhubreceiver/generated_component_test.go +++ b/collector/components/azureeventhubreceiver/generated_component_test.go @@ -9,11 +9,9 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/collector/confmap/confmaptest" ) diff --git a/collector/components/azureeventhubreceiver/go.mod b/collector/components/azureeventhubreceiver/go.mod index 67629d4..f0b6b2f 100644 --- a/collector/components/azureeventhubreceiver/go.mod +++ b/collector/components/azureeventhubreceiver/go.mod @@ -104,7 +104,7 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza => ../../pkg/stanza +// replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza => ../../pkg/stanza retract ( v0.76.2 diff --git a/collector/components/azureeventhubreceiver/rawlogs_unmarshaler.go b/collector/components/azureeventhubreceiver/rawlogs_unmarshaler.go index f5019df..e416827 100644 --- a/collector/components/azureeventhubreceiver/rawlogs_unmarshaler.go +++ b/collector/components/azureeventhubreceiver/rawlogs_unmarshaler.go @@ -35,43 +35,3 @@ func (r rawLogsUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) return l, nil } - -// // Copyright The OpenTelemetry Authors -// // SPDX-License-Identifier: Apache-2.0 - -// package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" - -// import ( -// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" -// "go.opentelemetry.io/collector/pdata/pcommon" -// "go.opentelemetry.io/collector/pdata/plog" -// "go.uber.org/zap" -// ) - -// type rawLogsUnmarshaler struct { -// logger *zap.Logger -// } - -// func newRawLogsUnmarshaler(logger *zap.Logger) eventLogsUnmarshaler { - -// return rawLogsUnmarshaler{ -// logger: logger, -// } -// } - -// func (r rawLogsUnmarshaler) UnmarshalLogs(event *[]azeventhubs.ReceivedEventData) (plog.Logs, error) { - -// l := plog.NewLogs() -// lr := l.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() -// slice := lr.Body().SetEmptyBytes() -// slice.Append(event.EventData...) -// if event.SystemProperties.EnqueuedTime != nil { -// lr.SetTimestamp(pcommon.NewTimestampFromTime(*event.SystemProperties.EnqueuedTime)) -// } - -// if err := lr.Attributes().FromRaw(event.Properties); err != nil { -// return l, err -// } - -// return l, nil -// }