Skip to content

Commit

Permalink
updated udp-server removed processor abstraction
Browse files Browse the repository at this point in the history
Signed-off-by: Avinash <[email protected]>
  • Loading branch information
avinpy-255 committed Mar 1, 2025
1 parent e87c298 commit b06c4e5
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 21 deletions.
107 changes: 86 additions & 21 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"github.com/jaegertracing/jaeger-idl/thrift-gen/agent"
"github.com/jaegertracing/jaeger-idl/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger-idl/thrift-gen/zipkincore"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/processor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/multierr"
Expand All @@ -44,8 +44,9 @@ type jReceiver struct {
grpc *grpc.Server
collectorServer *http.Server

agentProcessors []processor.Processor
agentServer *http.Server
binaryUDPServer *UDPServer
compactUDPServer *UDPServer
agentServer *http.Server

goroutines sync.WaitGroup

Expand All @@ -55,6 +56,12 @@ type jReceiver struct {
httpObsrecv *receiverhelper.ObsReport
}

type obsReportingConsumer struct {
nextConsumer consumer.Traces
obsrecv *receiverhelper.ObsReport
format string
}

const (
agentTransportBinary = "udp_thrift_binary"
agentTransportCompact = "udp_thrift_compact"
Expand Down Expand Up @@ -105,6 +112,17 @@ func newJaegerReceiver(
}, nil
}

func (oc *obsReportingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
ctx = oc.obsrecv.StartTracesOp(ctx)
err := oc.nextConsumer.ConsumeTraces(ctx, td)
oc.obsrecv.EndTracesOp(ctx, oc.format, td.SpanCount(), err)
return err
}

func (oc *obsReportingConsumer) Capabilities() consumer.Capabilities {
return oc.nextConsumer.Capabilities()
}

func (jr *jReceiver) Start(ctx context.Context, host component.Host) error {
if err := jr.startAgent(); err != nil {
return err
Expand All @@ -121,12 +139,19 @@ func (jr *jReceiver) Shutdown(ctx context.Context) error {
errs = multierr.Append(errs, aerr)
}
}
for _, processor := range jr.agentProcessors {
if err := processor.Stop(); err != nil {
if jr.binaryUDPServer != nil {
if err := jr.binaryUDPServer.Stop(); err != nil {
errs = multierr.Append(errs, err)
}
}

if jr.compactUDPServer != nil {
if err := jr.compactUDPServer.Stop(); err != nil {
errs = multierr.Append(errs, err)
}

}

if jr.collectorServer != nil {
if cerr := jr.collectorServer.Shutdown(ctx); cerr != nil {
errs = multierr.Append(errs, cerr)
Expand Down Expand Up @@ -205,44 +230,84 @@ func (jr *jReceiver) startAgent() error {
return err
}

h := &agentHandler{
wrappedConsumer := &obsReportingConsumer{
nextConsumer: jr.nextConsumer,
obsrecv: obsrecv,
format: thriftFormat,
}

p, err := processor.NewThriftProcessor(
udpServer, err := NewUDPServer(
jr.config.ThriftBinaryUDP.Endpoint,
jr.config.ThriftBinaryUDP.ServerConfigUDP.MaxPacketSize,
wrappedConsumer,
apacheThrift.NewTBinaryProtocolFactoryConf(nil),
h,
jr.settings.Logger,
)
if err != nil {
return err
return fmt.Errorf("failed to create Binary Thrift UDP server: %w", err)
}
jr.agentProcessors = append(jr.agentProcessors, p)

if err = udpServer.Start(); err != nil {
return fmt.Errorf("failed to start Binary Thrift UDP server: %w", err)
}

jr.binaryUDPServer = udpServer

jr.settings.Logger.Info("Starting UDP server for Binary Thrift", zap.String("endpoint", jr.config.ThriftBinaryUDP.Endpoint))
}

for _, p := range jr.agentProcessors {
if err := p.Start(); err != nil {
if jr.config.ThriftCompactUDP != nil {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: jr.id,
Transport: agentTransportCompact,
ReceiverCreateSettings: jr.settings,
})
if err != nil {
return err
}
}
// Create a wrapped consumer that includes the metrics
wrappedConsumer := &obsReportingConsumer{
nextConsumer: jr.nextConsumer,
obsrecv: obsrecv,
format: thriftFormat,
}

for _, p := range jr.agentProcessors {
if err := p.Start(); err != nil {
return err
// Create and start the UDP server for Compact Thrift
udpServer, err := NewUDPServer(
jr.config.ThriftCompactUDP.Endpoint,
jr.config.ThriftCompactUDP.ServerConfigUDP.MaxPacketSize,
wrappedConsumer,
apacheThrift.NewTCompactProtocolFactoryConf(nil),
jr.settings.Logger,
)
if err != nil {
return fmt.Errorf("failed to create Compact Thrift UDP server: %w", err)
}

if err = udpServer.Start(); err != nil {
return fmt.Errorf("failed to start Compact Thrift UDP server: %w", err)
}

jr.compactUDPServer = udpServer
jr.settings.Logger.Info("Started UDP server for Compact Thrift", zap.String("endpoint", jr.config.ThriftCompactUDP.Endpoint))
}

jr.goroutines.Add(len(jr.agentProcessors))
for _, p := range jr.agentProcessors {
go func(p processor.Processor) {
if jr.binaryUDPServer != nil {
jr.goroutines.Add(1)
go func() {
defer jr.goroutines.Done()
<-p.Done()
}(p)
<-jr.binaryUDPServer.Done()
}()
}

if jr.compactUDPServer != nil {
jr.goroutines.Add(1)
go func() {

defer jr.goroutines.Done()
<-jr.compactUDPServer.Done()
}()

}

return nil
Expand Down
174 changes: 174 additions & 0 deletions receiver/jaegerreceiver/udp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package jaegerreceiver

import (
"context"
"errors"
"net"
"sync"
"sync/atomic"

apacheThrift "github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger-idl/thrift-gen/agent"
"github.com/jaegertracing/jaeger-idl/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger-idl/thrift-gen/zipkincore"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)

type UDPServer struct {
conn *net.UDPConn
maxPacketSize int
protoFactory apacheThrift.TProtocolFactory
processor *agent.AgentProcessor
handler agent.Agent
logger *zap.Logger

running atomic.Bool
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
}

func NewUDPServer(
endpoint string,
maxPacketSize int,
nextConsumer consumer.Traces,
protoFactory apacheThrift.TProtocolFactory,
logger *zap.Logger,
) (*UDPServer, error) {
if endpoint == "" {
return nil, errors.New("empty endpoint")
}
if nextConsumer == nil {
return nil, errors.New("nil nextConsumer")
}

host, port, err := net.SplitHostPort(endpoint)
if err != nil {
return nil, err
}

addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, port))
if err != nil {
return nil, err
}

conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

handler := &jaegerThriftHandler{
nextConsumer: nextConsumer,
logger: logger,
}

s := &UDPServer{
conn: conn,
maxPacketSize: maxPacketSize,
protoFactory: protoFactory,
processor: agent.NewAgentProcessor(handler),
handler: handler,
logger: logger,
stopChan: make(chan struct{}),
}

return s, nil
}

func (s *UDPServer) Start() error {
if s.running.Swap(true) {
return errors.New("UDP server has already started")
}

s.stopWG.Add(1)
go func() {
defer s.stopWG.Done()
s.serve()
}()

s.logger.Info("Jaeger UDP server started", zap.String("address", s.conn.LocalAddr().String()))
return nil
}

func (s *UDPServer) serve() {
buf := make([]byte, s.maxPacketSize)

for {
select {
case <-s.stopChan:
return
default:
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
if !s.running.Load() {
return
}
s.logger.Error("Failed to read UDP packet", zap.Error(err))
continue
}

s.handleMsg(buf[:n])
}
}
}

func (s *UDPServer) handleMsg(msg []byte) {
transport := apacheThrift.NewTMemoryBufferLen(len(msg))
if _, err := transport.Write(msg); err != nil {
s.logger.Error("Cannot write to memory buffer", zap.Error(err))
return
}

protocol := s.protoFactory.GetProtocol(transport)
ctx := context.Background()

_, err := s.processor.Process(ctx, protocol, protocol)
if err != nil {
s.logger.Error("Failed to process UDP packet", zap.Error(err))
}
}

func (s *UDPServer) Stop() error {
var err error
s.stopOnce.Do(func() {
s.running.Store(false)
close(s.stopChan)
err = s.conn.Close()
s.stopWG.Wait()
s.logger.Info("Jaeger UDP server stopped")
})
return err
}

func (s *UDPServer) Done() <-chan struct{} {
return s.stopChan
}

type jaegerThriftHandler struct {
nextConsumer consumer.Traces
logger *zap.Logger
}

func (h *jaegerThriftHandler) EmitZipkinBatch(ctx context.Context, spans []*zipkincore.Span) error {
return errors.New("unsupported zipkin receiver")
}

func (h *jaegerThriftHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
if batch == nil || len(batch.Spans) == 0 {
return nil
}

td, err := jaegertranslator.ThriftToTraces(batch)
if err != nil {
h.logger.Error("Failed to convert Jaeger batch to traces", zap.Error(err))
return err
}

return h.nextConsumer.ConsumeTraces(ctx, td)
}

0 comments on commit b06c4e5

Please sign in to comment.