Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/components/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (ta *TraceAttacher) attacherLoop(_ context.Context) (swarm.RunFunc, error)
ta.processInstances = maps.MultiCounter[uint64]{}
ta.beylaPID = os.Getpid()
ta.EbpfEventContext.CommonPIDsFilter = ebpfcommon.CommonPIDsFilter(&ta.Cfg.Discovery, ta.Metrics)
ta.routeHarvester = harvest.NewRouteHarvester(ta.Cfg.Discovery.DisabledRouteHarvesters)
ta.routeHarvester = harvest.NewRouteHarvester(ta.Cfg.Discovery.DisabledRouteHarvesters, ta.Cfg.Discovery.RouteHarvesterTimeout)

if err := ta.init(); err != nil {
ta.log.Error("cant start process tracer. Stopping it", "error", err)
Expand Down
70 changes: 63 additions & 7 deletions pkg/components/transform/route/harvest/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package harvest

import (
"context"
"log/slog"
"strings"
"time"

"go.opentelemetry.io/obi/pkg/components/exec"
"go.opentelemetry.io/obi/pkg/components/svc"
Expand All @@ -16,6 +18,10 @@ type RouteHarvester struct {
log *slog.Logger
java *JavaRoutes
disabled map[svc.InstrumentableType]struct{}
timeout time.Duration

// testing related
javaExtractRoutes func(pid int32) (*RouteHarvesterResult, error)
}

type RouteHarvesterResultKind uint8
Expand All @@ -30,29 +36,79 @@ type RouteHarvesterResult struct {
Kind RouteHarvesterResultKind
}

func NewRouteHarvester(disabled []string) *RouteHarvester {
// HarvestError represents an error that occurred during route harvesting
type HarvestError struct {
Message string
}

func (e *HarvestError) Error() string {
return e.Message
}

func NewRouteHarvester(disabled []string, timeout time.Duration) *RouteHarvester {
dMap := map[svc.InstrumentableType]struct{}{}
for _, lang := range disabled {
if strings.ToLower(lang) == "java" {
dMap[svc.InstrumentableJava] = struct{}{}
}
}

return &RouteHarvester{
h := &RouteHarvester{
log: slog.With("component", "route.harvester"),
java: NewJavaRoutesHarvester(),
disabled: dMap,
timeout: timeout,
}

h.javaExtractRoutes = h.java.ExtractRoutes

return h
}

func (h *RouteHarvester) HarvestRoutes(fileInfo *exec.FileInfo) (*RouteHarvesterResult, error) {
if fileInfo.Service.SDKLanguage == svc.InstrumentableJava {
if _, ok := h.disabled[svc.InstrumentableJava]; !ok {
return h.java.ExtractRoutes(fileInfo.Pid)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()

// Channel to receive the result
resultChan := make(chan *RouteHarvesterResult, 1)
errorChan := make(chan error, 1)

// Run the harvesting in a goroutine
go func() {
defer func() {
if r := recover(); r != nil {
h.log.Error("route harvesting failed", "error", r)
errorChan <- &HarvestError{Message: "harvesting failed"}
}
}()

if fileInfo.Service.SDKLanguage == svc.InstrumentableJava {
if _, ok := h.disabled[svc.InstrumentableJava]; !ok {
result, err := h.javaExtractRoutes(fileInfo.Pid)
if err != nil {
errorChan <- err
return
}
resultChan <- result
} else {
resultChan <- nil
}
} else {
resultChan <- nil
}
}
}()

return nil, nil
// Wait for either completion or timeout
select {
case result := <-resultChan:
return result, nil
case err := <-errorChan:
return nil, err
case <-ctx.Done():
h.log.Warn("route harvesting timed out", "timeout", h.timeout, "pid", fileInfo.Pid)
return nil, &HarvestError{Message: "route harvesting timed out"}
}
}

func RouteMatcherFromResult(r RouteHarvesterResult) route.Matcher {
Expand Down
200 changes: 200 additions & 0 deletions pkg/components/transform/route/harvest/harvester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package harvest

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/obi/pkg/components/exec"
"go.opentelemetry.io/obi/pkg/components/svc"
)

// successfulExtractRoutes simulates a successful route extraction
func successfulExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
return &RouteHarvesterResult{
Routes: []string{"/api/users", "/api/orders"},
Kind: CompleteRoutes,
}, nil
}

// errorExtractRoutes simulates an error during route extraction
func errorExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
return nil, errors.New("failed to connect to Java process")
}

// timeoutExtractRoutes simulates a slow operation that will timeout
func timeoutExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
// Sleep longer than any reasonable timeout
time.Sleep(5 * time.Second)
return &RouteHarvesterResult{
Routes: []string{"/api/delayed"},
Kind: CompleteRoutes,
}, nil
}

// panicExtractRoutes simulates a panic during route extraction
func panicExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
panic("unexpected error in java route extraction")
}

// slowButSuccessfulExtractRoutes simulates a slow but successful operation
func slowButSuccessfulExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
time.Sleep(50 * time.Millisecond) // Slow but within timeout
return &RouteHarvesterResult{
Routes: []string{"/api/slow"},
Kind: PartialRoutes,
}, nil
}

// emptyResultExtractRoutes simulates successful extraction with no routes
func emptyResultExtractRoutes(_ int32) (*RouteHarvesterResult, error) {
return &RouteHarvesterResult{
Routes: []string{},
Kind: CompleteRoutes,
}, nil
}

func createTestFileInfo(language svc.InstrumentableType) *exec.FileInfo {
return &exec.FileInfo{
Pid: 12345,
Service: svc.Attrs{
SDKLanguage: language,
},
}
}

func TestHarvestRoutes_Successful(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 1*time.Second)
harvester.javaExtractRoutes = successfulExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

result, err := harvester.HarvestRoutes(fileInfo)

require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, []string{"/api/users", "/api/orders"}, result.Routes)
assert.Equal(t, CompleteRoutes, result.Kind)
}

func TestHarvestRoutes_Error(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 1*time.Second)
harvester.javaExtractRoutes = errorExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

result, err := harvester.HarvestRoutes(fileInfo)

require.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "failed to connect to Java process")
}

func TestHarvestRoutes_Timeout(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 100*time.Millisecond) // Short timeout
harvester.javaExtractRoutes = timeoutExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

start := time.Now()
result, err := harvester.HarvestRoutes(fileInfo)
elapsed := time.Since(start)

require.Error(t, err)
assert.Nil(t, result)

// Check that it's a HarvestError with timeout message
var harvestErr *HarvestError
require.ErrorAs(t, err, &harvestErr)
assert.Equal(t, "route harvesting timed out", harvestErr.Message)

// Ensure it actually timed out quickly (within reasonable bounds)
assert.Less(t, elapsed, 200*time.Millisecond)
assert.Greater(t, elapsed, 90*time.Millisecond)
}

func TestHarvestRoutes_Panic(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 1*time.Second)
harvester.javaExtractRoutes = panicExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

result, err := harvester.HarvestRoutes(fileInfo)

require.Error(t, err)
assert.Nil(t, result)

// Check that panic was caught and converted to HarvestError
var harvestErr *HarvestError
require.ErrorAs(t, err, &harvestErr)
assert.Equal(t, "harvesting failed", harvestErr.Message)
}

func TestHarvestRoutes_SlowButSuccessful(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 200*time.Millisecond) // Enough time for slow operation
harvester.javaExtractRoutes = slowButSuccessfulExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

result, err := harvester.HarvestRoutes(fileInfo)

require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, []string{"/api/slow"}, result.Routes)
assert.Equal(t, PartialRoutes, result.Kind)
}

func TestHarvestRoutes_EmptyResult(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 1*time.Second)
harvester.javaExtractRoutes = emptyResultExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

result, err := harvester.HarvestRoutes(fileInfo)

require.NoError(t, err)
require.NotNil(t, result)
assert.Empty(t, result.Routes)
assert.Equal(t, CompleteRoutes, result.Kind)
}

func TestHarvestRoutes_NonJavaLanguage(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 1*time.Second)
// javaExtractRoutes should not be called for non-Java languages
harvester.javaExtractRoutes = func(_ int32) (*RouteHarvesterResult, error) {
t.Fatal("javaExtractRoutes should not be called for non-Java languages")
return nil, nil
}

fileInfo := createTestFileInfo(svc.InstrumentableGolang)

result, err := harvester.HarvestRoutes(fileInfo)

require.NoError(t, err)
assert.Nil(t, result) // Should return nil for non-Java languages
}

func TestHarvestRoutes_MultipleTimeouts(t *testing.T) {
harvester := NewRouteHarvester([]string{}, 50*time.Millisecond)
harvester.javaExtractRoutes = timeoutExtractRoutes

fileInfo := createTestFileInfo(svc.InstrumentableJava)

// Test multiple calls to ensure timeout behavior is consistent
for i := 0; i < 3; i++ {
result, err := harvester.HarvestRoutes(fileInfo)

require.Error(t, err, "iteration %d should timeout", i)
assert.Nil(t, result, "iteration %d should return nil result", i)

var harvestErr *HarvestError
require.ErrorAs(t, err, &harvestErr, "iteration %d should return HarvestError", i)
assert.Equal(t, "route harvesting timed out", harvestErr.Message, "iteration %d should have timeout message", i)
}
}
5 changes: 3 additions & 2 deletions pkg/obi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ var DefaultConfig = Config{
Metadata: map[string]*services.GlobAttr{"k8s_namespace": &k8sDefaultNamespacesGlob},
},
},
MinProcessAge: 5 * time.Second,
DefaultOtlpGRPCPort: 4317,
MinProcessAge: 5 * time.Second,
DefaultOtlpGRPCPort: 4317,
RouteHarvesterTimeout: 10 * time.Second,
},
NodeJS: NodeJSConfig{
Enabled: true,
Expand Down
3 changes: 2 additions & 1 deletion pkg/obi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ discovery:
Metadata: map[string]*services.GlobAttr{"k8s_namespace": &k8sDefaultNamespacesGlob},
},
},
DefaultOtlpGRPCPort: 4317,
DefaultOtlpGRPCPort: 4317,
RouteHarvesterTimeout: 10 * time.Second,
},
NodeJS: NodeJSConfig{
Enabled: true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/services/criteria.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type DiscoveryConfig struct {
// Disables generation of span metrics of services which are already instrumented
ExcludeOTelInstrumentedServicesSpanMetrics bool `yaml:"exclude_otel_instrumented_services_span_metrics" env:"OTEL_EBPF_EXCLUDE_OTEL_INSTRUMENTED_SERVICES_SPAN_METRICS"`

RouteHarvesterTimeout time.Duration `yaml:"route_harvester_timeout" env:"OTEL_EBPF_ROUTE_HARVESTER_TIMEOUT"`

DisabledRouteHarvesters []string `yaml:"disabled_route_harvesters"`
}

Expand Down
Loading