Skip to content

Commit bf56e73

Browse files
authored
ForEachInput pattern wrapper [Open to discussion] (#571)
* ForEachInput pattern * Fix copy/reformate error
1 parent 17720c0 commit bf56e73

File tree

22 files changed

+295
-329
lines changed

22 files changed

+295
-329
lines changed

pkg/components/appolly/appolly.go

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.opentelemetry.io/obi/pkg/obi"
2424
"go.opentelemetry.io/obi/pkg/pipe/msg"
2525
"go.opentelemetry.io/obi/pkg/pipe/swarm"
26+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
2627
"go.opentelemetry.io/obi/pkg/transform"
2728
)
2829

@@ -146,44 +147,34 @@ func (i *Instrumenter) WaitUntilFinished() error {
146147

147148
func (i *Instrumenter) instrumentedEventLoop(ctx context.Context, processEvents <-chan discover.Event[*ebpf.Instrumentable]) {
148149
log := log()
149-
for {
150-
select {
151-
case <-ctx.Done():
152-
return
153-
case ev, ok := <-processEvents:
154-
if !ok {
155-
log.Error("processEvents channel closed. Application instrumentation has stopped working")
156-
return
150+
swarms.ForEachInput(ctx, processEvents, log.Debug, func(ev discover.Event[*ebpf.Instrumentable]) {
151+
switch ev.Type {
152+
case discover.EventCreated:
153+
pt := ev.Obj
154+
log.Debug("running tracer for new process",
155+
"inode", pt.FileInfo.Ino, "pid", pt.FileInfo.Pid, "exec", pt.FileInfo.CmdExePath)
156+
if pt.Tracer != nil {
157+
i.tracersWg.Add(1)
158+
go func() {
159+
defer i.tracersWg.Done()
160+
pt.Tracer.Run(ctx, i.ebpfEventContext, i.tracesInput)
161+
}()
157162
}
158-
159-
switch ev.Type {
160-
case discover.EventCreated:
161-
pt := ev.Obj
162-
log.Debug("running tracer for new process",
163-
"inode", pt.FileInfo.Ino, "pid", pt.FileInfo.Pid, "exec", pt.FileInfo.CmdExePath)
164-
if pt.Tracer != nil {
165-
i.tracersWg.Add(1)
166-
go func() {
167-
defer i.tracersWg.Done()
168-
pt.Tracer.Run(ctx, i.ebpfEventContext, i.tracesInput)
169-
}()
170-
}
171-
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventCreated, File: pt.FileInfo})
172-
case discover.EventDeleted:
173-
dp := ev.Obj
174-
log.Debug("stopping ProcessTracer because there are no more instances of such process",
175-
"inode", dp.FileInfo.Ino, "pid", dp.FileInfo.Pid, "exec", dp.FileInfo.CmdExePath)
176-
if dp.Tracer != nil {
177-
dp.Tracer.UnlinkExecutable(dp.FileInfo)
178-
}
179-
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventTerminated, File: dp.FileInfo})
180-
case discover.EventInstanceDeleted:
181-
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventTerminated, File: ev.Obj.FileInfo})
182-
default:
183-
log.Error("BUG ALERT! unknown event type", "type", ev.Type)
163+
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventCreated, File: pt.FileInfo})
164+
case discover.EventDeleted:
165+
dp := ev.Obj
166+
log.Debug("stopping ProcessTracer because there are no more instances of such process",
167+
"inode", dp.FileInfo.Ino, "pid", dp.FileInfo.Pid, "exec", dp.FileInfo.CmdExePath)
168+
if dp.Tracer != nil {
169+
dp.Tracer.UnlinkExecutable(dp.FileInfo)
184170
}
171+
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventTerminated, File: dp.FileInfo})
172+
case discover.EventInstanceDeleted:
173+
i.handleAndDispatchProcessEvent(exec.ProcessEvent{Type: exec.ProcessEventTerminated, File: ev.Obj.FileInfo})
174+
default:
175+
log.Error("BUG ALERT! unknown event type", "type", ev.Type)
185176
}
186-
}
177+
})
187178
}
188179

189180
// ReadAndForward keeps listening for traces in the BPF map, then reads,

pkg/components/discover/attacher.go

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package discover
55

66
import (
77
"context"
8+
"fmt"
89
"log/slog"
910
"os"
1011

1112
"github.com/cilium/ebpf/link"
13+
"github.com/cilium/ebpf/rlimit"
1214

1315
"go.opentelemetry.io/obi/pkg/app/request"
1416
"go.opentelemetry.io/obi/pkg/components/ebpf"
@@ -22,6 +24,7 @@ import (
2224
"go.opentelemetry.io/obi/pkg/obi"
2325
"go.opentelemetry.io/obi/pkg/pipe/msg"
2426
"go.opentelemetry.io/obi/pkg/pipe/swarm"
27+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
2528
)
2629

2730
// TraceAttacher creates the available trace.Tracer implementations (Go HTTP tracer, GRPC tracer, Generic tracer...)
@@ -91,49 +94,36 @@ func (ta *TraceAttacher) attacherLoop(_ context.Context) (swarm.RunFunc, error)
9194
in := ta.InputInstrumentables.Subscribe()
9295
return func(ctx context.Context) {
9396
defer ta.OutputTracerEvents.Close()
94-
95-
for {
96-
select {
97-
case <-ctx.Done():
98-
ta.log.Debug("context done. terminating process attacher")
99-
ta.close()
100-
return
101-
case instrumentables, ok := <-in:
102-
if !ok {
103-
ta.log.Debug("input channel closed. terminating process attacher")
104-
ta.close()
105-
return
106-
}
107-
for _, instr := range instrumentables {
108-
ta.log.Debug("Instrumentable", "created", instr.Type, "type", instr.Obj.Type,
109-
"exec", instr.Obj.FileInfo.CmdExePath, "pid", instr.Obj.FileInfo.Pid)
110-
switch instr.Type {
111-
case EventCreated:
112-
sdkInstrumented := false
113-
if ta.sdkInjectionPossible(&instr.Obj) {
114-
if err := ta.sdkInjector.NewExecutable(&instr.Obj); err == nil {
115-
sdkInstrumented = true
116-
}
97+
swarms.ForEachInput(ctx, in, ta.log.Debug, func(instrumentables []Event[ebpf.Instrumentable]) {
98+
for _, instr := range instrumentables {
99+
ta.log.Debug("Instrumentable", "created", instr.Type, "type", instr.Obj.Type,
100+
"exec", instr.Obj.FileInfo.CmdExePath, "pid", instr.Obj.FileInfo.Pid)
101+
switch instr.Type {
102+
case EventCreated:
103+
sdkInstrumented := false
104+
if ta.sdkInjectionPossible(&instr.Obj) {
105+
if err := ta.sdkInjector.NewExecutable(&instr.Obj); err == nil {
106+
sdkInstrumented = true
117107
}
108+
}
118109

119-
if !sdkInstrumented {
120-
ta.nodeInjector.NewExecutable(&instr.Obj)
110+
if !sdkInstrumented {
111+
ta.nodeInjector.NewExecutable(&instr.Obj)
121112

122-
ta.processInstances.Inc(instr.Obj.FileInfo.Ino)
123-
if ok := ta.getTracer(&instr.Obj); ok {
124-
ta.OutputTracerEvents.Send(Event[*ebpf.Instrumentable]{Type: EventCreated, Obj: &instr.Obj})
125-
}
113+
ta.processInstances.Inc(instr.Obj.FileInfo.Ino)
114+
if ok := ta.getTracer(&instr.Obj); ok {
115+
ta.OutputTracerEvents.Send(Event[*ebpf.Instrumentable]{Type: EventCreated, Obj: &instr.Obj})
116+
}
126117

127-
if instr.Obj.FileInfo.ELF != nil {
128-
_ = instr.Obj.FileInfo.ELF.Close()
129-
}
118+
if instr.Obj.FileInfo.ELF != nil {
119+
_ = instr.Obj.FileInfo.ELF.Close()
130120
}
131-
case EventDeleted:
132-
ta.notifyProcessDeletion(&instr.Obj)
133121
}
122+
case EventDeleted:
123+
ta.notifyProcessDeletion(&instr.Obj)
134124
}
135125
}
136-
}
126+
})
137127
}, nil
138128
}
139129

@@ -397,3 +387,10 @@ func (ta *TraceAttacher) notifyProcessDeletion(ie *ebpf.Instrumentable) {
397387
func (ta *TraceAttacher) sdkInjectionPossible(ie *ebpf.Instrumentable) bool {
398388
return ta.sdkInjector.Enabled() && ie.Type == svc.InstrumentableJava
399389
}
390+
391+
func (ta *TraceAttacher) init() error {
392+
if err := rlimit.RemoveMemlock(); err != nil {
393+
return fmt.Errorf("removing memory lock: %w", err)
394+
}
395+
return nil
396+
}

pkg/components/discover/attacher_linux.go

Lines changed: 0 additions & 20 deletions
This file was deleted.

pkg/components/discover/attacher_nolinux.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

pkg/components/discover/container_updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/obi/pkg/components/kube"
1313
"go.opentelemetry.io/obi/pkg/pipe/msg"
1414
"go.opentelemetry.io/obi/pkg/pipe/swarm"
15+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
1516
)
1617

1718
// ContainerDBUpdaterProvider is a stage in the Process Finder pipeline that will be
@@ -36,9 +37,9 @@ func updateLoop(
3637
db *kube.Store, in <-chan []Event[ebpf.Instrumentable], out *msg.Queue[[]Event[ebpf.Instrumentable]],
3738
) swarm.RunFunc {
3839
log := slog.With("component", "ContainerDBUpdater")
39-
return func(_ context.Context) {
40+
return func(ctx context.Context) {
4041
defer out.Close()
41-
for instrumentables := range in {
42+
swarms.ForEachInput(ctx, in, log.Debug, func(instrumentables []Event[ebpf.Instrumentable]) {
4243
for i := range instrumentables {
4344
ev := &instrumentables[i]
4445
switch ev.Type {
@@ -51,6 +52,6 @@ func updateLoop(
5152
}
5253
}
5354
out.Send(instrumentables)
54-
}
55+
})
5556
}
5657
}

pkg/components/discover/matcher.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.opentelemetry.io/obi/pkg/obi"
1818
"go.opentelemetry.io/obi/pkg/pipe/msg"
1919
"go.opentelemetry.io/obi/pkg/pipe/swarm"
20+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
2021
"go.opentelemetry.io/obi/pkg/services"
2122
)
2223

@@ -71,24 +72,14 @@ type ProcessMatch struct {
7172
func (m *Matcher) Run(ctx context.Context) {
7273
defer m.Output.Close()
7374
m.Log.Debug("starting criteria matcher node")
74-
for {
75-
select {
76-
case <-ctx.Done():
77-
m.Log.Debug("context cancelled, stopping criteria matcher node")
78-
return
79-
case i, ok := <-m.Input:
80-
if !ok {
81-
m.Log.Debug("input channel closed, stopping criteria matcher node")
82-
return
83-
}
84-
m.Log.Debug("filtering processes", "len", len(i))
85-
o := m.filter(i)
86-
m.Log.Debug("processes matching selection criteria", "len", len(o))
87-
if len(o) > 0 {
88-
m.Output.Send(o)
89-
}
75+
swarms.ForEachInput(ctx, m.Input, m.Log.Debug, func(i []Event[ProcessAttrs]) {
76+
m.Log.Debug("filtering processes", "len", len(i))
77+
o := m.filter(i)
78+
m.Log.Debug("processes matching selection criteria", "len", len(o))
79+
if len(o) > 0 {
80+
m.Output.Send(o)
9081
}
91-
}
82+
})
9283
}
9384

9485
func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] {

pkg/components/discover/typer.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.opentelemetry.io/obi/pkg/obi"
2424
"go.opentelemetry.io/obi/pkg/pipe/msg"
2525
"go.opentelemetry.io/obi/pkg/pipe/swarm"
26+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
2627
"go.opentelemetry.io/obi/pkg/services"
2728
)
2829

@@ -52,26 +53,17 @@ func ExecTyperProvider(
5253
currentPids: map[int32]*exec.FileInfo{},
5354
instrumentableCache: instrumentableCache,
5455
}
55-
return func(ctx context.Context) (swarm.RunFunc, error) {
56+
return func(_ context.Context) (swarm.RunFunc, error) {
5657
// TODO: do it per executable
5758
if !cfg.Discovery.SkipGoSpecificTracers {
5859
t.loadAllGoFunctionNames()
5960
}
6061
in := input.Subscribe()
61-
return func(_ context.Context) {
62+
return func(ctx context.Context) {
6263
defer output.Close()
63-
for {
64-
select {
65-
case <-ctx.Done():
66-
t.log.Debug("context cancelled, closing ExecTyper")
67-
return
68-
case i, ok := <-in:
69-
if !ok {
70-
return
71-
}
72-
output.Send(t.FilterClassify(i))
73-
}
74-
}
64+
swarms.ForEachInput(ctx, in, t.log.Debug, func(i []Event[ProcessMatch]) {
65+
output.Send(t.FilterClassify(i))
66+
})
7567
}, nil
7668
}
7769
}

pkg/components/discover/watcher_proc.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"go.opentelemetry.io/obi/pkg/obi"
2828
"go.opentelemetry.io/obi/pkg/pipe/msg"
2929
"go.opentelemetry.io/obi/pkg/pipe/swarm"
30+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
3031
"go.opentelemetry.io/obi/pkg/services"
3132
)
3233

@@ -197,24 +198,19 @@ func portOfInterest(criteria []services.Selector, port int) bool {
197198
}
198199

199200
func (pa *pollAccounter) watchForProcessEvents(ctx context.Context, log *slog.Logger, events <-chan watcher.Event) {
200-
for {
201-
select {
202-
case <-ctx.Done():
203-
return
204-
case e := <-events:
205-
switch e.Type {
206-
case watcher.Ready:
207-
pa.bpfWatcherIsReady()
208-
case watcher.NewPort:
209-
port := int(e.Payload)
210-
if pa.cfg.Port.Matches(port) || portOfInterest(pa.findingCriteria, port) {
211-
pa.refetchPorts()
212-
}
213-
default:
214-
log.Warn("Unknown ebpf process watch event", "type", e.Type)
201+
swarms.ForEachInput(ctx, events, log.Debug, func(e watcher.Event) {
202+
switch e.Type {
203+
case watcher.Ready:
204+
pa.bpfWatcherIsReady()
205+
case watcher.NewPort:
206+
port := int(e.Payload)
207+
if pa.cfg.Port.Matches(port) || portOfInterest(pa.findingCriteria, port) {
208+
pa.refetchPorts()
215209
}
210+
default:
211+
log.Warn("Unknown ebpf process watch event", "type", e.Type)
216212
}
217-
}
213+
})
218214
}
219215

220216
func (pa *pollAccounter) processTooNew(proc ProcessAttrs) bool {

pkg/components/netolly/flow/decorator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.opentelemetry.io/obi/pkg/components/netolly/ebpf"
2929
"go.opentelemetry.io/obi/pkg/pipe/msg"
3030
"go.opentelemetry.io/obi/pkg/pipe/swarm"
31+
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
3132
)
3233

3334
type InterfaceNamer func(ifIndex int) string
@@ -40,9 +41,9 @@ type InterfaceNamer func(ifIndex int) string
4041
func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer, input *msg.Queue[[]*ebpf.Record], output *msg.Queue[[]*ebpf.Record]) swarm.RunFunc {
4142
ip := agentIP.String()
4243
in := input.Subscribe()
43-
return func(_ context.Context) {
44+
return func(ctx context.Context) {
4445
defer output.Close()
45-
for flows := range in {
46+
swarms.ForEachInput(ctx, in, nil, func(flows []*ebpf.Record) {
4647
for _, flow := range flows {
4748
flow.Attrs.Interface = ifaceNamer(int(flow.Id.IfIndex))
4849
flow.Attrs.OBIIP = ip
@@ -54,6 +55,6 @@ func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer, input *msg.Queue[[]*ebp
5455
}
5556
}
5657
output.Send(flows)
57-
}
58+
})
5859
}
5960
}

0 commit comments

Comments
 (0)