Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to send loggregator envelopes type LOG and EVENT to OtelCollector as logs #595

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent-windows/monit
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","),
"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => "#{p("temporary_emit_otel_traces")}",
"EMIT_EVENTS_AS_OTEL_LOGS" => "#{p("emit_events_as_otel_logs")}",
"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
"METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt",
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent-windows/spec
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ properties:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

emit_events_as_otel_logs:
description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent/spec
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ properties:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

emit_events_as_otel_logs:
description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => p("temporary_emit_otel_traces"),
"EMIT_EVENTS_AS_OTEL_LOGS" => p("emit_events_as_otel_logs"),

"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
Expand Down
61 changes: 61 additions & 0 deletions src/cmd/forwarder-agent/app/app_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
Expand Down Expand Up @@ -328,3 +329,63 @@ func (s *spyOtelColTraceServer) Export(_ context.Context, req *coltracepb.Export
func (s *spyOtelColTraceServer) close() {
s.srv.Stop()
}

// A fake OTel Collector logs gRPC server that captures requests made to it.
type spyOtelColLogServer struct {
collogspb.UnimplementedLogsServiceServer

srv *grpc.Server
addr string

requests chan *collogspb.ExportLogsServiceRequest
}

// Creates a spyOtelColLogServer, starts it listening on a random port,
// registers it as a gRPC service, and writes out a temp file for the forwarder
// agent to recognize it as a destination. The cfgPath it accepts is the folder
// under which to write the temp file.
func startSpyOtelColLogServer(cfgPath string, tc *testhelper.TestCerts, commonName string) *spyOtelColLogServer {
serverCreds, err := plumbing.NewServerCredentials(
tc.Cert(commonName),
tc.Key(commonName),
tc.CA(),
)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

lis, err := net.Listen("tcp", "127.0.0.1:")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

s := &spyOtelColLogServer{
srv: grpc.NewServer(grpc.Creds(serverCreds)),
requests: make(chan *collogspb.ExportLogsServiceRequest, 10000),
addr: lis.Addr().String(),
}

collogspb.RegisterLogsServiceServer(s.srv, s)
go s.srv.Serve(lis) //nolint:errcheck

port, err := strconv.Atoi(strings.Split(s.addr, ":")[1])
ExpectWithOffset(1, err).NotTo(HaveOccurred())

dir, err := os.MkdirTemp(cfgPath, "")
ExpectWithOffset(1, err).ToNot(HaveOccurred())
tmpfn := filepath.Join(dir, "ingress_port.yml")

contents := fmt.Sprintf(`---
ingress: %d
protocol: otelcol
`, port)
err = os.WriteFile(tmpfn, []byte(contents), 0600)
ExpectWithOffset(1, err).ToNot(HaveOccurred())

return s
}

func (s *spyOtelColLogServer) Export(_ context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
s.requests <- req
return &collogspb.ExportLogsServiceResponse{}, nil
}

func (s *spyOtelColLogServer) close() {
s.srv.Stop()
}
1 change: 1 addition & 0 deletions src/cmd/forwarder-agent/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
Tags map[string]string `env:"AGENT_TAGS"`
DebugMetrics bool `env:"DEBUG_METRICS, report"`
EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"`
EmitEventsAsOTelLogs bool `env:"EMIT_EVENTS_AS_OTEL_LOGS, report"`
}

// LoadConfig will load the configuration for the forwarder agent from the
Expand Down
12 changes: 7 additions & 5 deletions src/cmd/forwarder-agent/app/forwarder_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ForwarderAgent struct {
tags map[string]string
debugMetrics bool
emitOTelTraces bool
emitEventsAsOTelLogs bool
}

type Metrics interface {
Expand Down Expand Up @@ -73,6 +74,7 @@ func NewForwarderAgent(
tags: cfg.Tags,
debugMetrics: cfg.MetricsServer.DebugMetrics,
emitOTelTraces: cfg.EmitOTelTraces,
emitEventsAsOTelLogs: cfg.EmitEventsAsOTelLogs,
}
}

Expand All @@ -96,7 +98,7 @@ func (s *ForwarderAgent) Run() {
}))

dests := downstreamDestinations(s.downstreamFilePattern, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitEventsAsOTelLogs, s.log)
tagger := egress_v2.NewTagger(s.tags)
ew := egress_v2.NewEnvelopeWriter(
multiWriter{writers: writers},
Expand Down Expand Up @@ -209,13 +211,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination {
return dests
}

func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces bool, l *log.Logger) []Writer {
func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitEventsAsOTelLogs bool, l *log.Logger) []Writer {
var writers []Writer
for _, d := range dests {
var w Writer
switch d.Protocol {
case "otelcol":
w = otelCollectorClient(d, grpc, m, emitOTelTraces, l)
w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitEventsAsOTelLogs, l)
default:
w = loggregatorClient(d, grpc, m, l)
}
Expand All @@ -224,7 +226,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces
return writers
}

func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool, l *log.Logger) Writer {
func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitEvents bool, l *log.Logger) Writer {
clientCreds, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile),
Expand Down Expand Up @@ -252,7 +254,7 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool
}),
)

dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces), gendiodes.AlertFunc(func(missed int) {
dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces, emitEvents), gendiodes.AlertFunc(func(missed int) {
expired.Add(float64(missed))
}), timeoutwaitgroup.New(time.Minute))

Expand Down
101 changes: 99 additions & 2 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/config"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"

Expand Down Expand Up @@ -349,24 +350,42 @@ var _ = Describe("App", func() {
})
})

Context("when an OTel Collector is registered to forward to", func() {
Context("when an OTel Collector is registered to", func() {
jriguera marked this conversation as resolved.
Show resolved Hide resolved
var (
otelMetricsServer *spyOtelColMetricServer
otelTraceServer *spyOtelColTraceServer
otelLogsServer *spyOtelColLogServer
)

BeforeEach(func() {
otelMetricsServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector")
otelLogsServer = startSpyOtelColLogServer(ingressCfgPath, agentCerts, "otel-collector")
agentCfg.EmitOTelTraces = true
agentCfg.EmitEventsAsOTelLogs = true
})

JustBeforeEach(func() {
// Because the event being sent JustBeforeEach in the main test, channels need to be emptied
for len(otelMetricsServer.requests) > 0 {
<-otelMetricsServer.requests
}
for len(otelTraceServer.requests) > 0 {
<-otelTraceServer.requests
}
// test-title events
for len(otelLogsServer.requests) > 0 {
<-otelLogsServer.requests
}
})

AfterEach(func() {
otelMetricsServer.close()
otelTraceServer.close()
otelLogsServer.close()
})

DescribeTable("some envelopes are not forwarded",
DescribeTable("not forward log and event envelopes to otel metrics",
jriguera marked this conversation as resolved.
Show resolved Hide resolved
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelMetricsServer.requests, 3).ShouldNot(Receive())
Expand All @@ -375,6 +394,16 @@ var _ = Describe("App", func() {
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
)

DescribeTable("not forward counters, gagues and timers envelopes to otel logs",
jriguera marked this conversation as resolved.
Show resolved Hide resolved
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelLogsServer.requests, 3).ShouldNot(Receive())
},
Entry("drops counters", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Counter{}}),
Entry("drops gauges", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Gauge{}}),
Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}),
)

It("forwards counters", func() {
name := "test-counter-name"
ingressClient.EmitCounter(name)
Expand Down Expand Up @@ -408,6 +437,74 @@ var _ = Describe("App", func() {
Expect(trace.GetName()).To(Equal(name))
})

It("forwards logs", func() {
body := "test log body"
ingressClient.EmitLog(body, loggregator.WithStdout())

var req *collogspb.ExportLogsServiceRequest
Eventually(otelLogsServer.requests).Should(Receive(&req))

log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0]
Expect(log.GetBody().GetStringValue()).To(Equal(body))
})

It("forwards events", func() {
title := "event title"
body := "event body"
ingressClient.EmitEvent(context.TODO(), title, body)

var req *collogspb.ExportLogsServiceRequest
Eventually(otelLogsServer.requests).Should(Receive(&req))

log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0]
Expect(len(log.GetBody().GetKvlistValue().GetValues())).To(Equal(2))
for _, v := range log.GetBody().GetKvlistValue().GetValues() {
switch v.GetKey() {
case "title":
Expect(v.GetValue().GetStringValue()).To(Equal(title))
case "body":
Expect(v.GetValue().GetStringValue()).To(Equal(body))
default:
Expect(v.GetKey()).ToNot(HaveOccurred())
}
}
})

Context("when support for forwarding events as traces is not active", func() {
BeforeEach(func() {
agentCfg.EmitEventsAsOTelLogs = false
})

It("only emits events to other destinations", func() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer wg.Wait()
defer cancel()

wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ingressClient.EmitEvent(context.TODO(), "title", "event body")
}
}
}()

var e *loggregator_v2.Envelope
Eventually(ingressServer1.envelopes, 5).Should(Receive(&e))
Expect(e.GetEvent().GetTitle()).To(Equal("title"))
Expect(e.GetEvent().GetBody()).To(Equal("event body"))
Consistently(otelLogsServer.requests, 5).ShouldNot(Receive())
})
})

Context("when support for forwarding timers as traces is not active", func() {
BeforeEach(func() {
agentCfg.EmitOTelTraces = false
Expand Down
Loading