Skip to content

Commit 0169d1f

Browse files
committed
Add delayq message format
1 parent b68826e commit 0169d1f

File tree

3 files changed

+81
-2
lines changed

3 files changed

+81
-2
lines changed

delayq/message.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package delayq
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/gob"
7+
8+
ztrace "github.com/zeromicro/go-zero/core/trace"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/propagation"
11+
"go.opentelemetry.io/otel/trace"
12+
)
13+
14+
type (
15+
Action string
16+
Message struct {
17+
Carrier propagation.MapCarrier
18+
Action Action
19+
Body []byte
20+
}
21+
)
22+
23+
func (m *Message) Encode() ([]byte, error) {
24+
var buf bytes.Buffer
25+
enc := gob.NewEncoder(&buf)
26+
if err := enc.Encode(m); err != nil {
27+
return nil, err
28+
}
29+
return buf.Bytes(), nil
30+
}
31+
32+
func (m *Message) Inject(ctx context.Context) (context.Context, trace.Span) {
33+
tracer := otel.Tracer(ztrace.TraceName)
34+
spanCtx, span := tracer.Start(ctx, string(m.Action), trace.WithSpanKind(trace.SpanKindProducer))
35+
m.Carrier = propagation.MapCarrier{}
36+
propagator := otel.GetTextMapPropagator()
37+
propagator.Inject(spanCtx, m.Carrier)
38+
return spanCtx, span
39+
}
40+
41+
func (m *Message) Decode(data []byte) error {
42+
buf := bytes.NewBuffer(data)
43+
dec := gob.NewDecoder(buf)
44+
return dec.Decode(m)
45+
}
46+
47+
func (m *Message) Extract(ctx context.Context) (context.Context, trace.Span) {
48+
propagator := otel.GetTextMapPropagator()
49+
ctx = propagator.Extract(ctx, m.Carrier)
50+
tracer := otel.Tracer(ztrace.TraceName)
51+
spanCtx, span := tracer.Start(ctx, string(m.Action), trace.WithSpanKind(trace.SpanKindConsumer))
52+
return spanCtx, span
53+
}

delayq/message_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package delayq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestMessageEncodeDecode(t *testing.T) {
10+
msg := Message{
11+
Carrier: map[string]string{
12+
"key1": "value1",
13+
"key2": "value2",
14+
},
15+
Action: "action1",
16+
Body: []byte("body1"),
17+
}
18+
data, err := msg.Encode()
19+
assert.NoError(t, err)
20+
21+
newMsg := Message{}
22+
err = newMsg.Decode(data)
23+
assert.NoError(t, err)
24+
25+
assert.Exactly(t, msg, newMsg)
26+
}

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ require (
88
github.com/gorilla/sessions v1.3.0
99
github.com/stretchr/testify v1.9.0
1010
github.com/zeromicro/go-zero v1.7.0
11+
go.opentelemetry.io/otel v1.24.0
12+
go.opentelemetry.io/otel/trace v1.24.0
1113
xorm.io/xorm v1.3.9
1214
)
1315

@@ -38,7 +40,6 @@ require (
3840
github.com/robfig/cron/v3 v3.0.1 // indirect
3941
github.com/spaolacci/murmur3 v1.1.0 // indirect
4042
github.com/syndtr/goleveldb v1.0.0 // indirect
41-
go.opentelemetry.io/otel v1.24.0 // indirect
4243
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
4344
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
4445
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
@@ -47,7 +48,6 @@ require (
4748
go.opentelemetry.io/otel/exporters/zipkin v1.24.0 // indirect
4849
go.opentelemetry.io/otel/metric v1.24.0 // indirect
4950
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
50-
go.opentelemetry.io/otel/trace v1.24.0 // indirect
5151
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
5252
go.uber.org/automaxprocs v1.5.3 // indirect
5353
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect

0 commit comments

Comments
 (0)