Skip to content

Commit aca08a8

Browse files
mauri870mergify[bot]
authored andcommitted
oteltest: make CheckReceivers environment more similar to the collector (#43302)
* oteltest: make CheckReceivers environment more similar to the collector The CheckReceivers function is designed to start receivers in the same process to assert conditions. This simulated environment should closely resemble the real OTel collector startup. This PR fixes some inconsistencies in the test environment. First, when instantiating receivers, the collector first creates them using the factory and only then starts each component. Previously, we were creating and starting each receiver sequentially, which is incorrect and masked issues with global state when multiple receivers were present. Second, a Beats receiver logger inherits from the zap.Core of the collector logger. This core includes certain fields—specifically data_type, kind, and name. In the tests, these fields were previously missing, so this PR ensures they are included. * remove i++ * use EventuallyWithT for assertions * fix imports * specify factory per receiver (cherry picked from commit 8d31036)
1 parent 8ddd793 commit aca08a8

File tree

3 files changed

+86
-55
lines changed

3 files changed

+86
-55
lines changed

Diff for: libbeat/otelbeat/oteltest/oteltest.go

+40-17
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,18 @@ import (
3939
)
4040

4141
type ReceiverConfig struct {
42-
Name string
43-
Config component.Config
42+
Name string
43+
Config component.Config
44+
Factory receiver.Factory
4445
}
4546

4647
type CheckReceiversParams struct {
4748
T *testing.T
48-
// Factory that allows to create a receiver.
49-
Factory receiver.Factory
5049
// Receivers is a list of receiver configurations to create.
5150
Receivers []ReceiverConfig
5251
// AssertFunc is a function that asserts the test conditions.
53-
// The function is called periodically until it returns true which ends the test.
54-
AssertFunc func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool
52+
// The function is called periodically until the assertions are met or the timeout is reached.
53+
AssertFunc func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs)
5554
}
5655

5756
// CheckReceivers creates receivers using the provided configuration.
@@ -69,11 +68,22 @@ func CheckReceivers(params CheckReceiversParams) {
6968
)
7069
observed, zapLogs := observer.New(zapcore.DebugLevel)
7170

72-
createReceiver := func(t *testing.T, name string, cfg component.Config) receiver.Logs {
71+
core := zapcore.NewTee(zapCore, observed)
72+
73+
createReceiver := func(t *testing.T, rc ReceiverConfig) receiver.Logs {
7374
t.Helper()
7475

7576
var receiverSettings receiver.Settings
76-
receiverSettings.Logger = zap.New(zapcore.NewTee(zapCore, observed)).Named(name)
77+
78+
// Replicate the behavior of the collector logger
79+
receiverCore := core.
80+
With([]zapcore.Field{
81+
zap.String("name", rc.Name),
82+
zap.String("kind", "receiver"),
83+
zap.String("data_type", "logs"),
84+
})
85+
86+
receiverSettings.Logger = zap.New(receiverCore)
7787

7888
logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
7989
for i := 0; i < ld.ResourceLogs().Len(); i++ {
@@ -83,33 +93,46 @@ func CheckReceivers(params CheckReceiversParams) {
8393
for k := 0; k < sl.LogRecords().Len(); k++ {
8494
log := sl.LogRecords().At(k)
8595
logsMu.Lock()
86-
logs[name] = append(logs[name], log.Body().Map().AsRaw())
96+
logs[rc.Name] = append(logs[rc.Name], log.Body().Map().AsRaw())
8797
logsMu.Unlock()
8898
}
8999
}
90100
}
91101
return nil
92102
})
93-
assert.NoErrorf(t, err, "Error creating log consumer for %q", name)
103+
assert.NoErrorf(t, err, "Error creating log consumer for %q", rc.Name)
94104

95-
r, err := params.Factory.CreateLogs(ctx, receiverSettings, cfg, logConsumer)
96-
assert.NoErrorf(t, err, "Error creating receiver %q", name)
105+
r, err := rc.Factory.CreateLogs(ctx, receiverSettings, rc.Config, logConsumer)
106+
assert.NoErrorf(t, err, "Error creating receiver %q", rc.Name)
97107
return r
98108
}
99109

110+
// Replicate the collector behavior to instantiate components first and then start them.
111+
var receivers []receiver.Logs
100112
for _, rec := range params.Receivers {
101-
r := createReceiver(t, rec.Name, rec.Config)
113+
receivers = append(receivers, createReceiver(t, rec))
114+
}
115+
116+
for i, r := range receivers {
102117
err := r.Start(ctx, nil)
103-
require.NoErrorf(t, err, "Error starting receiver %q", rec.Name)
118+
require.NoErrorf(t, err, "Error starting receiver %d", i)
104119
defer func() {
105-
require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %q", rec.Name)
120+
require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i)
106121
}()
107122
}
108123

109-
require.Eventually(t, func() bool {
124+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
110125
logsMu.Lock()
111126
defer logsMu.Unlock()
112127

113-
return params.AssertFunc(t, logs, zapLogs)
128+
// Ensure the logger fields from the otel collector are present in the logs.
129+
for _, zl := range zapLogs.All() {
130+
require.Contains(t, zl.ContextMap(), "name")
131+
require.Equal(t, zl.ContextMap()["kind"], "receiver")
132+
require.Equal(t, zl.ContextMap()["data_type"], "logs")
133+
break
134+
}
135+
136+
params.AssertFunc(ct, logs, zapLogs)
114137
}, time.Minute, 100*time.Millisecond, "timeout waiting for assertion to pass")
115138
}

Diff for: x-pack/filebeat/fbreceiver/receiver_test.go

+24-24
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
1313
"github.com/elastic/elastic-agent-libs/mapstr"
1414

15+
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"go.opentelemetry.io/collector/receiver"
1718
"go.uber.org/zap"
@@ -46,17 +47,17 @@ func TestNewReceiver(t *testing.T) {
4647
}
4748

4849
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
49-
T: t,
50-
Factory: NewFactory(),
50+
T: t,
5151
Receivers: []oteltest.ReceiverConfig{
5252
{
53-
Name: "r1",
54-
Config: &config,
53+
Name: "r1",
54+
Config: &config,
55+
Factory: NewFactory(),
5556
},
5657
},
57-
AssertFunc: func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool {
58+
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
5859
_ = zapLogs
59-
return len(logs["r1"]) == 1
60+
assert.Len(t, logs["r1"], 1)
6061
},
6162
})
6263
}
@@ -88,18 +89,16 @@ func TestReceiverDefaultProcessors(t *testing.T) {
8889
}
8990

9091
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
91-
T: t,
92-
Factory: NewFactory(),
92+
T: t,
9393
Receivers: []oteltest.ReceiverConfig{
9494
{
95-
Name: "r1",
96-
Config: &config,
95+
Name: "r1",
96+
Config: &config,
97+
Factory: NewFactory(),
9798
},
9899
},
99-
AssertFunc: func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool {
100-
if len(logs["r1"]) == 0 {
101-
return false
102-
}
100+
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
101+
require.Len(t, logs["r1"], 1)
103102

104103
processorsLoaded := zapLogs.FilterMessageSnippet("Generated new processors").
105104
FilterMessageSnippet("add_host_metadata").
@@ -110,8 +109,6 @@ func TestReceiverDefaultProcessors(t *testing.T) {
110109
require.True(t, processorsLoaded, "processors not loaded")
111110
// Check that add_host_metadata works, other processors are not guaranteed to add fields in all environments
112111
require.Contains(t, logs["r1"][0].Flatten(), "host.architecture")
113-
114-
return true
115112
},
116113
})
117114
}
@@ -187,22 +184,25 @@ func TestMultipleReceivers(t *testing.T) {
187184
},
188185
}
189186

187+
factory := NewFactory()
190188
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
191-
T: t,
192-
Factory: NewFactory(),
189+
T: t,
193190
Receivers: []oteltest.ReceiverConfig{
194191
{
195-
Name: "r1",
196-
Config: &config,
192+
Name: "r1",
193+
Config: &config,
194+
Factory: factory,
197195
},
198196
{
199-
Name: "r2",
200-
Config: &config,
197+
Name: "r2",
198+
Config: &config,
199+
Factory: factory,
201200
},
202201
},
203-
AssertFunc: func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool {
202+
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
204203
_ = zapLogs
205-
return len(logs["r1"]) == 1 && len(logs["r2"]) == 1
204+
require.Len(t, logs["r1"], 1)
205+
require.Len(t, logs["r2"], 1)
206206
},
207207
})
208208
}

Diff for: x-pack/metricbeat/mbreceiver/receiver_test.go

+22-14
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
1111
"github.com/elastic/elastic-agent-libs/mapstr"
1212

13+
"github.com/stretchr/testify/assert"
14+
1315
"go.uber.org/zap/zaptest/observer"
1416
)
1517

@@ -41,17 +43,19 @@ func TestNewReceiver(t *testing.T) {
4143
}
4244

4345
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
44-
T: t,
45-
Factory: NewFactory(),
46+
T: t,
4647
Receivers: []oteltest.ReceiverConfig{
4748
{
48-
Name: "r1",
49-
Config: &config,
49+
Name: "r1",
50+
Config: &config,
51+
Factory: NewFactory(),
5052
},
5153
},
52-
AssertFunc: func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool {
54+
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
5355
_ = zapLogs
54-
return len(logs["r1"]) > 0
56+
assert.Conditionf(t, func() bool {
57+
return len(logs["r1"]) > 0
58+
}, "expected at least one ingest log, got logs: %v", logs["r1"])
5559
},
5660
})
5761
}
@@ -83,22 +87,26 @@ func TestMultipleReceivers(t *testing.T) {
8387
},
8488
}
8589

90+
factory := NewFactory()
8691
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
87-
T: t,
88-
Factory: NewFactory(),
92+
T: t,
8993
Receivers: []oteltest.ReceiverConfig{
9094
{
91-
Name: "r1",
92-
Config: &config,
95+
Name: "r1",
96+
Config: &config,
97+
Factory: factory,
9398
},
9499
{
95-
Name: "r2",
96-
Config: &config,
100+
Name: "r2",
101+
Config: &config,
102+
Factory: factory,
97103
},
98104
},
99-
AssertFunc: func(t *testing.T, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) bool {
105+
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
100106
_ = zapLogs
101-
return len(logs["r1"]) > 0 && len(logs["r2"]) > 0
107+
assert.Conditionf(t, func() bool {
108+
return len(logs["r1"]) > 0 && len(logs["r2"]) > 0
109+
}, "expected at least one ingest log for each receiver, got logs: %v", logs)
102110
},
103111
})
104112
}

0 commit comments

Comments
 (0)