From 234760951018b45338a60405048dd936b872759b Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 28 Jun 2024 21:48:32 +0000 Subject: [PATCH] Properly handle shutdown the collector gracefully --- opentelemetry-otelcol/go/shared.go | 83 +++++++++++++++------ opentelemetry-otelcol/go/shared/shared.go | 63 ---------------- opentelemetry-otelcol/test-requirements.txt | 2 + opentelemetry-otelcol/tests/test_otelcol.py | 29 +++++-- 4 files changed, 83 insertions(+), 94 deletions(-) delete mode 100644 opentelemetry-otelcol/go/shared/shared.go diff --git a/opentelemetry-otelcol/go/shared.go b/opentelemetry-otelcol/go/shared.go index be8fa0912b8..c93b40bacfa 100644 --- a/opentelemetry-otelcol/go/shared.go +++ b/opentelemetry-otelcol/go/shared.go @@ -13,6 +13,7 @@ import ( "context" "errors" "fmt" + "log" "math/rand" "sync" "time" @@ -55,14 +56,14 @@ func NewCollector(yaml *C.char) collectorInstance { func ShutdownCollector(c collectorInstance) collectorInstance { mu.Lock() defer mu.Unlock() - fmt.Printf("Stopping collector handle ID: %v\n", c.handle) + log.Printf("Stopping collector handle ID: %v", c.handle) cancel, ok := handlesToCancel[c.handle] if !ok { c.setError(fmt.Errorf("CollectorInstance with handle %v is not known", c.handle)) return c } - fmt.Printf("\n\nGot handle %v, cancel func %v\n\n", c.handle, cancel) + log.Printf("Got handle %v, cancel func %v", c.handle, cancel) c.setError(cancel()) delete(handlesToCancel, c.handle) return c @@ -99,32 +100,62 @@ func writeToCharBuf(buf []C.char, gostring string) { } } +var errTerminatedEarly = errors.New("collector terminated unexpectedly without error, check logs for details") + func mainNonBlocking(ctx context.Context, yamlUri string) (func() error, error) { - ctx, cancel := context.WithCancel(ctx) + col, err := createOtelcol(ctx, yamlUri) + if err != nil { + return nil, err + } + done := make(chan error) + // Run collector in background go func() { - defer cancel() - done <- main2(ctx, yamlUri) + // run with detached context + done <- col.Run(context.WithoutCancel(ctx)) }() - // Wait a short time to see if we fail fast, otherwise return without error to the caller - t := time.NewTimer(time.Millisecond * 50) - select { - case err := <-done: + shutdown := func() error { + log.Print("Starting shutdown") + col.Shutdown() + log.Print("Shutdown done") + defer log.Print("Receiver return value in channel") + return <-done + } + + // Poll collector state until running. Unfortunately there is no channel to subscribe to + // state changes + ticker := time.NewTicker(time.Millisecond * 5) + for col.GetState() == otelcol.StateStarting { + select { + case <-ticker.C: + case <-ctx.Done(): + log.Printf("while waiting for collector to start: %v", ctx.Err().Error()) + return nil, errors.Join( + fmt.Errorf("collector never entered running state (state=%v): %w", col.GetState(), ctx.Err()), + shutdown(), + ) + case err := <-done: + if err != nil { + return nil, err + } + return nil, errTerminatedEarly + } + } + + // check if passed Running + if col.GetState() >= otelcol.StateClosing { + err := <-done if err != nil { return nil, err } - return nil, errors.New("collector terminated early without error, check logs for more info") - case <-t.C: - return func() error { - cancel() - return <-done - }, nil + return nil, errTerminatedEarly } -} -func main2(ctx context.Context, yamlUri string) error { + return shutdown, nil +} +func createOtelcol(ctx context.Context, yamlUri string) (*otelcol.Collector, error) { info := component.BuildInfo{ Command: "otelcol-contrib", Description: "OpenTelemetry Collector Contrib", @@ -134,6 +165,8 @@ func main2(ctx context.Context, yamlUri string) error { set := otelcol.CollectorSettings{ BuildInfo: info, Factories: components, + // Let the python process handle gracefully shutting down + DisableGracefulShutdown: true, ConfigProviderSettings: otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{yamlUri}, @@ -151,12 +184,14 @@ func main2(ctx context.Context, yamlUri string) error { }, } - return runInteractive2(ctx, set) -} + col, err := otelcol.NewCollector(set) + if err != nil { + return nil, err + } -func runInteractive2(ctx context.Context, params otelcol.CollectorSettings) error { - cmd := otelcol.NewCommand(params) - // Unset args - cmd.SetArgs([]string{}) - return cmd.ExecuteContext(ctx) + // Fail fast + if err := col.DryRun(ctx); err != nil { + return nil, err + } + return col, nil } diff --git a/opentelemetry-otelcol/go/shared/shared.go b/opentelemetry-otelcol/go/shared/shared.go deleted file mode 100644 index 854a92a4557..00000000000 --- a/opentelemetry-otelcol/go/shared/shared.go +++ /dev/null @@ -1,63 +0,0 @@ -package main - -/* -struct CollectorInstance { - // c string containing the error or otherwise an empty string if successful - char err[128]; - unsigned int handle; -}; -*/ -import "C" -import ( - "fmt" - "math/rand" -) - -//export NewCollector -func NewCollector(yaml *C.char) collectorInstance { - yamlUri := "yaml:" + C.GoString(yaml) - return newCollectorInstance(main2(yamlUri)) -} - -//export ShutdownCollector -func ShutdownCollector(c collectorInstance) collectorInstance { - fmt.Printf("Stopping collector handle ID: %v\n", c.handle) - return c -} - -type collectorInstance C.struct_CollectorInstance - -func newCollectorInstance(err error) collectorInstance { - ret := collectorInstance{ - handle: C.uint(rand.Uint32()), - } - ret.setError(err) - return ret -} - -func (c *collectorInstance) setError(err error) { - if err != nil { - writeToCharBuf(c.err[:], err.Error()) - } -} - -func writeToCharBuf(buf []C.char, gostring string) { - var ( - i int - b byte - ) - for i, b = range []byte(gostring) { - if i == len(buf)-1 { - break - } - buf[i] = C.char(b) - } - // null terminate - buf[i+1] = 0 -} - -func main2(string) error { - return nil -} - -func main() {} diff --git a/opentelemetry-otelcol/test-requirements.txt b/opentelemetry-otelcol/test-requirements.txt index 90247015a0b..4ae6e67b66a 100644 --- a/opentelemetry-otelcol/test-requirements.txt +++ b/opentelemetry-otelcol/test-requirements.txt @@ -18,3 +18,5 @@ zipp==3.17.0 -e opentelemetry-semantic-conventions -e tests/opentelemetry-test-utils -e opentelemetry-api +-e exporter/opentelemetry-exporter-otlp-proto-grpc +-e exporter/opentelemetry-exporter-otlp-proto-common/ diff --git a/opentelemetry-otelcol/tests/test_otelcol.py b/opentelemetry-otelcol/tests/test_otelcol.py index d01d0724df4..0e3d2843da1 100644 --- a/opentelemetry-otelcol/tests/test_otelcol.py +++ b/opentelemetry-otelcol/tests/test_otelcol.py @@ -16,7 +16,13 @@ from time import sleep from unittest import TestCase +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) from opentelemetry.otelcol import Collector +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor class TestOtelCol(TestCase): @@ -28,31 +34,40 @@ def test_otelcolmain(self) -> None: otlp: protocols: grpc: - # http: + http: processors: batch: exporters: - otlp: - endpoint: otelcol:4317 + debug: + verbosity: detailed + googlecloud: + project: ${env:PROJECT_ID} service: pipelines: traces: receivers: [otlp] processors: [batch] - exporters: [otlp] + exporters: [debug, googlecloud] metrics: receivers: [otlp] processors: [batch] - exporters: [otlp] + exporters: [debug] logs: receivers: [otlp] processors: [batch] - exporters: [otlp] + exporters: [debug] """ ) ) - sleep(1) + + tp = TracerProvider( + resource=Resource.create({"service.name": "foobar"}) + ) + tp.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + with tp.get_tracer("foo").start_as_current_span("example-span"): + sleep(0.1) + tp.shutdown() col.shutdown()