Skip to content

Commit 4ae8b30

Browse files
authored
feature(trace): add opentelemetry (#25)
* feature(trace): add opentelemetry * feature(trace): add opentelemetry
1 parent 53dc3ac commit 4ae8b30

File tree

14 files changed

+326
-30
lines changed

14 files changed

+326
-30
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ A high performance MQTT broker
99
[![GitHub](https://img.shields.io/github/license/yunqi/lighthouse)](https://github.com/yunqi/lighthouse/blob/master/LICENSE)
1010
[![GitHub stars](https://img.shields.io/github/stars/yunqi/lighthouse)](https://github.com/yunqi/lighthouse/stargazers)
1111
[![GitHub pull requests](https://img.shields.io/github/issues-pr-raw/yunqi/lighthouse)](https://github.com/yunqi/lighthouse/pulls)
12+
13+
```shell
14+
docker run -d --rm --name jaeger -p6831:6831/udp -p16686:16686 jaegertracing/all-in-one:1.30
15+
```

cmd/lighthouse/config.yaml

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@ persistence:
99
type: memory
1010
queue:
1111
type: memory
12-
12+
trace:
13+
name: lighthouse
14+
endpoint: http://localhost:14268/api/traces
15+
sampler: 1.0
16+
batcher: jaeger

cmd/lighthouse/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
_ "github.com/yunqi/lighthouse/internal/persistence/session/redis"
99
"github.com/yunqi/lighthouse/internal/server"
1010
"github.com/yunqi/lighthouse/internal/xlog"
11+
"github.com/yunqi/lighthouse/internal/xtrace"
1112
"gopkg.in/yaml.v3"
1213
"net/http"
1314
_ "net/http/pprof"
@@ -32,6 +33,9 @@ func main() {
3233
if err != nil {
3334
panic(err)
3435
}
36+
37+
xtrace.StartAgent(&c.Trace)
38+
3539
go func() {
3640
_ = http.ListenAndServe("localhost:6060", nil)
3741
}()

config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Config struct {
2424
Mqtt Mqtt `yaml:"mqtt"`
2525
Log Log `yaml:"log"`
2626
Persistence Persistence `yaml:"persistence"`
27+
Trace Trace `yaml:"trace"`
2728
}
2829

2930
type Mqtt struct {

config/trace.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package config
2+
3+
type Trace struct {
4+
Name string `yaml:"name"`
5+
Endpoint string `yaml:"endpoint"`
6+
Sampler float64 `yaml:"sampler"`
7+
Batcher string `yaml:"batcher" validate:"eq=jaeger|eq=zipkin"` //jaeger|zipkin
8+
}

go.mod

+11-2
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,23 @@ require (
2121
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2222
github.com/davecgh/go-spew v1.1.1 // indirect
2323
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
24+
github.com/go-logr/logr v1.2.1 // indirect
25+
github.com/go-logr/stdr v1.2.0 // indirect
2426
github.com/go-playground/locales v0.14.0 // indirect
2527
github.com/go-playground/universal-translator v0.18.0 // indirect
2628
github.com/leodido/go-urn v1.2.1 // indirect
29+
github.com/openzipkin/zipkin-go v0.3.0 // indirect
2730
github.com/pmezard/go-difflib v1.0.0 // indirect
31+
go.opentelemetry.io/otel v1.3.0 // indirect
32+
go.opentelemetry.io/otel/exporters/jaeger v1.3.0 // indirect
33+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.3.0 // indirect
34+
go.opentelemetry.io/otel/exporters/zipkin v1.3.0 // indirect
35+
go.opentelemetry.io/otel/sdk v1.3.0 // indirect
36+
go.opentelemetry.io/otel/trace v1.3.0 // indirect
2837
go.uber.org/atomic v1.7.0 // indirect
2938
go.uber.org/multierr v1.6.0 // indirect
30-
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
39+
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
3140
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
3241
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe // indirect
33-
golang.org/x/text v0.3.6 // indirect
42+
golang.org/x/text v0.3.7 // indirect
3443
)

go.sum

+130
Large diffs are not rendered by default.

internal/packet/connect.go

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package packet
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"fmt"
2223
"github.com/yunqi/lighthouse/internal/code"
2324
"github.com/yunqi/lighthouse/internal/xerror"
@@ -27,6 +28,8 @@ import (
2728
type (
2829
// Connect represents the MQTT Connect packet.
2930
Connect struct {
31+
ctx context.Context
32+
3033
Version Version
3134
FixedHeader *FixedHeader
3235

@@ -68,6 +71,10 @@ type (
6871
}
6972
)
7073

74+
func (c *Connect) Context() context.Context {
75+
return c.ctx
76+
}
77+
7178
// NewConnect returns a Connect instance by the given FixHeader and io.Reader
7279
func NewConnect(fixedHeader *FixedHeader, version Version, r io.Reader) (*Connect, error) {
7380
//b1 := buffer[0] //一定是16

internal/packet/packet.go

+1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ type (
168168
Decode(r io.Reader) (err error)
169169
// String is mainly used in logging, debugging and testing.
170170
String() string
171+
//Context() context.Context
171172
}
172173
)
173174

internal/server/client.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package server
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"github.com/chenquan/go-pkg/xio"
@@ -126,6 +127,7 @@ type (
126127
wg sync.WaitGroup
127128
queueStore queue.Queue
128129
limit *packetIdLimiter
130+
ctx context.Context
129131
log *zap.Logger
130132
}
131133
)
@@ -183,6 +185,8 @@ func (c *client) Disconnect(disconnect *packet.Disconnect) {
183185
func newClient(server *server, conn net.Conn) *client {
184186
reader := xio.GetBufferReaderSize(conn, 2048)
185187
writer := xio.GetBufferWriterSize(conn, 2048)
188+
ctx, span := server.tracer.Start(context.Background(), conn.RemoteAddr().String())
189+
defer span.End()
186190

187191
c := &client{
188192
server: server,
@@ -196,7 +200,8 @@ func newClient(server *server, conn net.Conn) *client {
196200
out: make(chan packet.Packet, 8),
197201
closed: make(chan struct{}),
198202
connected: make(chan struct{}),
199-
log: xlog.LoggerModule("client"),
203+
ctx: ctx,
204+
log: xlog.LoggerWithContext(ctx, "client"),
200205
}
201206
return c
202207
}
@@ -550,3 +555,7 @@ func (c *client) pollInFlights() (bool, error) {
550555
func (c *client) newPacketIdLimiter(limit uint16) {
551556
c.limit = newPacketIDLimiter(limit)
552557
}
558+
559+
func (c *client) Context() context.Context {
560+
return c.ctx
561+
}

internal/server/server.go

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"github.com/yunqi/lighthouse/internal/persistence"
2525
"github.com/yunqi/lighthouse/internal/persistence/session"
2626
"github.com/yunqi/lighthouse/internal/xlog"
27+
"github.com/yunqi/lighthouse/internal/xtrace"
28+
"go.opentelemetry.io/otel"
29+
"go.opentelemetry.io/otel/trace"
2730
"go.uber.org/zap"
2831
"net"
2932
"time"
@@ -48,6 +51,7 @@ type (
4851
websocketListener *websocket.Conn
4952
sessions session.Store
5053
log *zap.Logger
54+
tracer trace.Tracer
5155
}
5256
)
5357

@@ -87,6 +91,9 @@ func loadServerOptions(opts ...Option) *Options {
8791
}
8892

8993
func (s *server) ServeTCP() {
94+
//propagator := otel.GetTextMapPropagator()
95+
s.tracer = otel.GetTracerProvider().Tracer(xtrace.Name)
96+
9097
defer func() {
9198
err := s.tcpListener.Close()
9299
if err != nil {

internal/xlog/log.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,63 @@
1717
package xlog
1818

1919
import (
20+
"context"
2021
"github.com/yunqi/lighthouse/config"
22+
"go.opentelemetry.io/otel/trace"
2123
"go.uber.org/zap"
2224
"go.uber.org/zap/zapcore"
2325
"gopkg.in/natefinch/lumberjack.v2"
2426
"os"
2527
)
2628

2729
var logger = zap.NewNop()
30+
var Info = logger.Info
31+
var Panic = logger.Panic
32+
var Error = logger.Error
33+
var Warn = logger.Warn
34+
var Debug = logger.Debug
35+
var Fatal = logger.Fatal
36+
37+
const TraceName = "lighthouse"
2838

2939
// LoggerModule release fields to a new logger.
3040
// Plugins can use this method to release plugin name field.
3141
func LoggerModule(moduleName string) *zap.Logger {
32-
return logger.With(zap.String("moduleName", moduleName))
42+
43+
return logger.With(
44+
zap.String("moduleName", moduleName),
45+
)
46+
}
47+
48+
// LoggerWithContext release fields to a new logger.
49+
// Plugins can use this method to release plugin name field.
50+
func LoggerWithContext(ctx context.Context, moduleName string) *zap.Logger {
51+
spanId := spanIdFromContext(ctx)
52+
straceId := traceIdFromContext(ctx)
53+
54+
return logger.With(
55+
zap.String("moduleName", moduleName),
56+
zap.String("traceId", straceId),
57+
zap.String("spanId", spanId),
58+
)
59+
}
60+
61+
func spanIdFromContext(ctx context.Context) string {
62+
spanCtx := trace.SpanContextFromContext(ctx)
63+
if spanCtx.HasSpanID() {
64+
return spanCtx.SpanID().String()
65+
}
66+
67+
return ""
68+
}
69+
70+
func traceIdFromContext(ctx context.Context) string {
71+
spanCtx := trace.SpanContextFromContext(ctx)
72+
if spanCtx.HasTraceID() {
73+
return spanCtx.TraceID().String()
74+
}
75+
76+
return ""
3377
}
3478

3579
func InitLogger(c *config.Log) (err error) {

internal/xtrace/trace.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package xtrace
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/yunqi/lighthouse/config"
7+
"github.com/yunqi/lighthouse/internal/xlog"
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/exporters/jaeger"
10+
"go.opentelemetry.io/otel/exporters/zipkin"
11+
"go.opentelemetry.io/otel/propagation"
12+
"go.opentelemetry.io/otel/sdk/resource"
13+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
14+
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
15+
"go.uber.org/zap"
16+
"sync"
17+
)
18+
19+
const Name = "lighthouse"
20+
21+
const (
22+
Jaeger = "jaeger"
23+
Zipkin = "zipkin"
24+
)
25+
26+
var (
27+
ErrUnknownExporter = errors.New("unknown exporter error")
28+
)
29+
30+
var (
31+
agents = make(map[string]struct{})
32+
lock sync.Mutex
33+
)
34+
35+
// StartAgent starts a opentelemetry agent.
36+
func StartAgent(c *config.Trace) {
37+
lock.Lock()
38+
defer lock.Unlock()
39+
40+
_, ok := agents[c.Endpoint]
41+
if ok {
42+
return
43+
}
44+
45+
// if error happens, let later calls run.
46+
if err := startAgent(c); err != nil {
47+
return
48+
}
49+
50+
agents[c.Endpoint] = struct{}{}
51+
}
52+
func startAgent(c *config.Trace) error {
53+
opts := []sdktrace.TracerProviderOption{
54+
// Set the sampling rate based on the parent span to 100%
55+
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),
56+
// Record information about this application in a Resource.
57+
sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))),
58+
}
59+
60+
if len(c.Endpoint) > 0 {
61+
exp, err := createExporter(c)
62+
if err != nil {
63+
xlog.Panic("opentelemetry exporter err", zap.Error(err))
64+
return err
65+
}
66+
67+
// Always be sure to batch in production.
68+
opts = append(opts, sdktrace.WithBatcher(exp))
69+
}
70+
71+
tp := sdktrace.NewTracerProvider(opts...)
72+
otel.SetTracerProvider(tp)
73+
74+
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
75+
propagation.TraceContext{}, propagation.Baggage{}))
76+
77+
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
78+
xlog.Error("[opentelemetry] error", zap.Error(err))
79+
}))
80+
81+
return nil
82+
}
83+
func createExporter(c *config.Trace) (sdktrace.SpanExporter, error) {
84+
// Just support jaeger and zipkin now, more for later
85+
switch c.Batcher {
86+
case Jaeger:
87+
return jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint)))
88+
case Zipkin:
89+
return zipkin.New(c.Endpoint)
90+
default:
91+
return nil, fmt.Errorf("%w: %s", ErrUnknownExporter, c.Batcher)
92+
}
93+
}

server/server.go

-25
This file was deleted.

0 commit comments

Comments
 (0)