Skip to content

Commit

Permalink
Properly handle shutdown the collector gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jun 28, 2024
1 parent d6dde88 commit 2347609
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 94 deletions.
83 changes: 59 additions & 24 deletions opentelemetry-otelcol/go/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -55,14 +56,14 @@ func NewCollector(yaml *C.char) collectorInstance {
func ShutdownCollector(c collectorInstance) collectorInstance {
mu.Lock()
defer mu.Unlock()
fmt.Printf("Stopping collector handle ID: %v\n", c.handle)
log.Printf("Stopping collector handle ID: %v", c.handle)
cancel, ok := handlesToCancel[c.handle]
if !ok {
c.setError(fmt.Errorf("CollectorInstance with handle %v is not known", c.handle))
return c
}

fmt.Printf("\n\nGot handle %v, cancel func %v\n\n", c.handle, cancel)
log.Printf("Got handle %v, cancel func %v", c.handle, cancel)
c.setError(cancel())
delete(handlesToCancel, c.handle)
return c
Expand Down Expand Up @@ -99,32 +100,62 @@ func writeToCharBuf(buf []C.char, gostring string) {
}
}

var errTerminatedEarly = errors.New("collector terminated unexpectedly without error, check logs for details")

func mainNonBlocking(ctx context.Context, yamlUri string) (func() error, error) {
ctx, cancel := context.WithCancel(ctx)
col, err := createOtelcol(ctx, yamlUri)
if err != nil {
return nil, err
}

done := make(chan error)
// Run collector in background
go func() {
defer cancel()
done <- main2(ctx, yamlUri)
// run with detached context
done <- col.Run(context.WithoutCancel(ctx))
}()

// Wait a short time to see if we fail fast, otherwise return without error to the caller
t := time.NewTimer(time.Millisecond * 50)
select {
case err := <-done:
shutdown := func() error {
log.Print("Starting shutdown")
col.Shutdown()
log.Print("Shutdown done")
defer log.Print("Receiver return value in channel")
return <-done
}

// Poll collector state until running. Unfortunately there is no channel to subscribe to
// state changes
ticker := time.NewTicker(time.Millisecond * 5)
for col.GetState() == otelcol.StateStarting {
select {
case <-ticker.C:
case <-ctx.Done():
log.Printf("while waiting for collector to start: %v", ctx.Err().Error())
return nil, errors.Join(
fmt.Errorf("collector never entered running state (state=%v): %w", col.GetState(), ctx.Err()),
shutdown(),
)
case err := <-done:
if err != nil {
return nil, err
}
return nil, errTerminatedEarly
}
}

// check if passed Running
if col.GetState() >= otelcol.StateClosing {
err := <-done
if err != nil {
return nil, err
}
return nil, errors.New("collector terminated early without error, check logs for more info")
case <-t.C:
return func() error {
cancel()
return <-done
}, nil
return nil, errTerminatedEarly
}
}

func main2(ctx context.Context, yamlUri string) error {
return shutdown, nil
}

func createOtelcol(ctx context.Context, yamlUri string) (*otelcol.Collector, error) {
info := component.BuildInfo{
Command: "otelcol-contrib",
Description: "OpenTelemetry Collector Contrib",
Expand All @@ -134,6 +165,8 @@ func main2(ctx context.Context, yamlUri string) error {
set := otelcol.CollectorSettings{
BuildInfo: info,
Factories: components,
// Let the python process handle gracefully shutting down
DisableGracefulShutdown: true,
ConfigProviderSettings: otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{yamlUri},
Expand All @@ -151,12 +184,14 @@ func main2(ctx context.Context, yamlUri string) error {
},
}

return runInteractive2(ctx, set)
}
col, err := otelcol.NewCollector(set)
if err != nil {
return nil, err
}

func runInteractive2(ctx context.Context, params otelcol.CollectorSettings) error {
cmd := otelcol.NewCommand(params)
// Unset args
cmd.SetArgs([]string{})
return cmd.ExecuteContext(ctx)
// Fail fast
if err := col.DryRun(ctx); err != nil {
return nil, err
}
return col, nil
}
63 changes: 0 additions & 63 deletions opentelemetry-otelcol/go/shared/shared.go

This file was deleted.

2 changes: 2 additions & 0 deletions opentelemetry-otelcol/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ zipp==3.17.0
-e opentelemetry-semantic-conventions
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e exporter/opentelemetry-exporter-otlp-proto-grpc
-e exporter/opentelemetry-exporter-otlp-proto-common/
29 changes: 22 additions & 7 deletions opentelemetry-otelcol/tests/test_otelcol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from time import sleep
from unittest import TestCase

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.otelcol import Collector
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


class TestOtelCol(TestCase):
Expand All @@ -28,31 +34,40 @@ def test_otelcolmain(self) -> None:
otlp:
protocols:
grpc:
# http:
http:
processors:
batch:
exporters:
otlp:
endpoint: otelcol:4317
debug:
verbosity: detailed
googlecloud:
project: ${env:PROJECT_ID}
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug, googlecloud]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug]
"""
)
)
sleep(1)

tp = TracerProvider(
resource=Resource.create({"service.name": "foobar"})
)
tp.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
with tp.get_tracer("foo").start_as_current_span("example-span"):
sleep(0.1)
tp.shutdown()
col.shutdown()

0 comments on commit 2347609

Please sign in to comment.