Skip to content

Commit

Permalink
Properly handle shutdown the collector gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jun 28, 2024
1 parent d6dde88 commit b47320d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 106 deletions.
120 changes: 86 additions & 34 deletions opentelemetry-otelcol/go/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"
Expand All @@ -29,42 +30,57 @@ import (
)

var (
mu = sync.Mutex{}
handlesToCancel = map[C.uint]func() error{}
mu = sync.Mutex{}
handlesToShutdown = map[C.uint]func(context.Context) error{}
)

//export NewCollector
func NewCollector(yaml *C.char) collectorInstance {
func NewCollector(yaml *C.char, timeoutMillis uint) collectorInstance {
mu.Lock()
defer mu.Unlock()
yamlUri := "yaml:" + C.GoString(yaml)
inst := newCollectorInstance(nil)

cancel, err := mainNonBlocking(context.Background(), yamlUri)
ctx := context.Background()
if timeoutMillis > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*time.Duration(timeoutMillis))
defer cancel()
}

shutdown, err := mainNonBlocking(ctx, yamlUri)
if err != nil {
inst.setError(err)
return inst
}

handlesToCancel[inst.handle] = cancel
handlesToShutdown[inst.handle] = shutdown

return inst
}

//export ShutdownCollector
func ShutdownCollector(c collectorInstance) collectorInstance {
func ShutdownCollector(c collectorInstance, timeoutMillis uint) collectorInstance {
mu.Lock()
defer mu.Unlock()
fmt.Printf("Stopping collector handle ID: %v\n", c.handle)
cancel, ok := handlesToCancel[c.handle]

ctx := context.Background()
if timeoutMillis > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*time.Duration(timeoutMillis))
defer cancel()
}

log.Printf("Stopping collector handle ID: %v", c.handle)
cancel, ok := handlesToShutdown[c.handle]
if !ok {
c.setError(fmt.Errorf("CollectorInstance with handle %v is not known", c.handle))
return c
}

fmt.Printf("\n\nGot handle %v, cancel func %v\n\n", c.handle, cancel)
c.setError(cancel())
delete(handlesToCancel, c.handle)
log.Printf("Got handle %v, cancel func %v", c.handle, cancel)
c.setError(cancel(ctx))
delete(handlesToShutdown, c.handle)
return c
}

Expand Down Expand Up @@ -99,32 +115,64 @@ func writeToCharBuf(buf []C.char, gostring string) {
}
}

func mainNonBlocking(ctx context.Context, yamlUri string) (func() error, error) {
ctx, cancel := context.WithCancel(ctx)
var errTerminatedEarly = errors.New("collector terminated unexpectedly without error, check logs for details")

func mainNonBlocking(ctx context.Context, yamlUri string) (func(context.Context) error, error) {
col, err := createOtelcol(ctx, yamlUri)
if err != nil {
return nil, err
}

done := make(chan error)
// Run collector in background
go func() {
defer cancel()
done <- main2(ctx, yamlUri)
// run with detached context
done <- col.Run(context.WithoutCancel(ctx))
}()

// Wait a short time to see if we fail fast, otherwise return without error to the caller
t := time.NewTimer(time.Millisecond * 50)
select {
case err := <-done:
shutdown := func(ctx context.Context) error {
log.Print("Starting shutdown")
col.Shutdown()
log.Print("Shutdown done")
defer log.Print("Receiver return value in channel")
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}

// Poll collector state until running. Unfortunately there is no channel to subscribe to
// state changes
ticker := time.NewTicker(time.Millisecond * 5)
for col.GetState() == otelcol.StateStarting {
select {
case <-ticker.C:
case <-ctx.Done():
log.Printf("while waiting for collector to start: %v", ctx.Err().Error())
return nil, fmt.Errorf("collector never entered running state (state=%v): %w", col.GetState(), ctx.Err())
case err := <-done:
if err != nil {
return nil, err
}
return nil, errTerminatedEarly
}
}

// check if passed Running
if col.GetState() >= otelcol.StateClosing {
err := <-done
if err != nil {
return nil, err
}
return nil, errors.New("collector terminated early without error, check logs for more info")
case <-t.C:
return func() error {
cancel()
return <-done
}, nil
return nil, errTerminatedEarly
}
}

func main2(ctx context.Context, yamlUri string) error {
return shutdown, nil
}

func createOtelcol(ctx context.Context, yamlUri string) (*otelcol.Collector, error) {
info := component.BuildInfo{
Command: "otelcol-contrib",
Description: "OpenTelemetry Collector Contrib",
Expand All @@ -134,6 +182,8 @@ func main2(ctx context.Context, yamlUri string) error {
set := otelcol.CollectorSettings{
BuildInfo: info,
Factories: components,
// Let the python process handle gracefully shutting down
DisableGracefulShutdown: true,
ConfigProviderSettings: otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{yamlUri},
Expand All @@ -151,12 +201,14 @@ func main2(ctx context.Context, yamlUri string) error {
},
}

return runInteractive2(ctx, set)
}
col, err := otelcol.NewCollector(set)
if err != nil {
return nil, err
}

func runInteractive2(ctx context.Context, params otelcol.CollectorSettings) error {
cmd := otelcol.NewCommand(params)
// Unset args
cmd.SetArgs([]string{})
return cmd.ExecuteContext(ctx)
// Fail fast
if err := col.DryRun(ctx); err != nil {
return nil, err
}
return col, nil
}
63 changes: 0 additions & 63 deletions opentelemetry-otelcol/go/shared/shared.go

This file was deleted.

13 changes: 11 additions & 2 deletions opentelemetry-otelcol/src/opentelemetry/otelcol/_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import ctypes
import pathlib

DEFAULT_TIMEOUT_MILLIS = 2000

logger = logging.getLogger(__name__)

sopath = pathlib.Path(__file__).parent / "otelcolcontrib.so"
Expand All @@ -42,6 +44,7 @@ def __repr__(self) -> str:
return f"_CollectorInstance(err={self.err}, handle={self.handle})"


otelcolcontrib.NewCollector.argtypes = [ctypes.c_char_p, ctypes.c_uint32]
otelcolcontrib.NewCollector.restype = _CollectorInstance
otelcolcontrib.ShutdownCollector.argtypes = [_CollectorInstance]
otelcolcontrib.ShutdownCollector.restype = _CollectorInstance
Expand All @@ -52,10 +55,16 @@ def __init__(self, config_yaml: str) -> None:
config_bytes = config_yaml.encode()

self._inst: _CollectorInstance = otelcolcontrib.NewCollector(
config_bytes
config_bytes,
# timeout millis,
2000,
)
self._inst.check_error()

def shutdown(self) -> None:
self._inst = otelcolcontrib.ShutdownCollector(self._inst)
self._inst = otelcolcontrib.ShutdownCollector(
self._inst,
# timeout millis,
10_000,
)
self._inst.check_error()
2 changes: 2 additions & 0 deletions opentelemetry-otelcol/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ zipp==3.17.0
-e opentelemetry-semantic-conventions
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e exporter/opentelemetry-exporter-otlp-proto-grpc
-e exporter/opentelemetry-exporter-otlp-proto-common/
29 changes: 22 additions & 7 deletions opentelemetry-otelcol/tests/test_otelcol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from time import sleep
from unittest import TestCase

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.otelcol import Collector
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


class TestOtelCol(TestCase):
Expand All @@ -28,31 +34,40 @@ def test_otelcolmain(self) -> None:
otlp:
protocols:
grpc:
# http:
http:
processors:
batch:
exporters:
otlp:
endpoint: otelcol:4317
debug:
verbosity: detailed
googlecloud:
project: ${env:PROJECT_ID}
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug, googlecloud]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
exporters: [debug]
"""
)
)
sleep(1)

tp = TracerProvider(
resource=Resource.create({"service.name": "foobar"})
)
tp.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
with tp.get_tracer("foo").start_as_current_span("example-span"):
sleep(0.1)
tp.shutdown()
col.shutdown()

0 comments on commit b47320d

Please sign in to comment.