Skip to content

Commit 7019f29

Browse files
Chief-Rishabravisuhag
authored andcommitted
feat: instrument grpc interceptor with OpenTelemetry
1 parent 8f3a769 commit 7019f29

File tree

28 files changed

+516
-189
lines changed

28 files changed

+516
-189
lines changed

.github/workflows/build.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Set up Go
2020
uses: actions/setup-go@v3
2121
with:
22-
go-version: "1.18"
22+
go-version: "1.20"
2323
cache: true
2424
check-latest: true
2525
- name: Get release tag

.github/workflows/release.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Set up Go
2020
uses: actions/setup-go@v3
2121
with:
22-
go-version: '^1.18'
22+
go-version: '^1.20'
2323
cache: true
2424
check-latest: true
2525
- name: Login to DockerHub

.github/workflows/test.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- name: Set up Go
1515
uses: actions/setup-go@v3
1616
with:
17-
go-version: '1.18'
17+
go-version: '1.20'
1818
cache: true
1919
- name: Install dependencies
2020
run: sudo apt-get install build-essential
@@ -53,7 +53,7 @@ jobs:
5353
- name: Set up Go
5454
uses: actions/setup-go@v3
5555
with:
56-
go-version: '1.18'
56+
go-version: '1.20'
5757
cache: true
5858
- name: Install dependencies
5959
run: sudo apt-get install build-essential
@@ -74,7 +74,7 @@ jobs:
7474
- name: Set up Go
7575
uses: actions/setup-go@v3
7676
with:
77-
go-version: '1.18'
77+
go-version: '1.20'
7878
cache: true
7979
- name: Download coverage
8080
uses: actions/download-artifact@v3

cmd/lint.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func LintCmd() *cobra.Command {
2828
return &cobra.Command{
2929
Use: "lint [path]",
3030
Aliases: []string{"l"},
31-
Args: cobra.MatchAll(cobra.ExactArgs(1)),
31+
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
3232
Short: "Check for issues in recipes",
3333
Long: heredoc.Doc(`
3434
Check for issues specified recipes.
@@ -193,14 +193,12 @@ func printConfigError(rcp recipe.Recipe, pluginNode recipe.PluginNode, err plugi
193193
}
194194

195195
// findPluginByName checks plugin by provided name
196-
func findPluginByName(plugins []recipe.PluginRecipe, name string) (plugin recipe.PluginRecipe, exists bool) {
197-
for _, p := range plugins {
196+
func findPluginByName(pp []recipe.PluginRecipe, name string) (recipe.PluginRecipe, bool) {
197+
for _, p := range pp {
198198
if p.Name == name {
199-
exists = true
200-
plugin = p
201-
return
199+
return p, true
202200
}
203201
}
204202

205-
return
203+
return recipe.PluginRecipe{}, false
206204
}

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/raystack/meteor
22

3-
go 1.18
3+
go 1.20
44

55
require (
66
cloud.google.com/go/bigquery v1.52.0
@@ -142,7 +142,7 @@ require (
142142
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
143143
github.com/google/uuid v1.3.0 // indirect
144144
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
145-
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
145+
github.com/googleapis/gax-go/v2 v2.12.0
146146
github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect
147147
github.com/gorilla/css v1.0.0 // indirect
148148
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.1 // indirect

metrics/otelgrpc/otelgrpc.go

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package otelgrpc
2+
3+
import (
4+
"context"
5+
"net"
6+
"strings"
7+
"time"
8+
9+
"github.com/raystack/meteor/utils"
10+
"go.opentelemetry.io/otel"
11+
"go.opentelemetry.io/otel/attribute"
12+
"go.opentelemetry.io/otel/metric"
13+
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/peer"
16+
"google.golang.org/protobuf/proto"
17+
)
18+
19+
type UnaryParams struct {
20+
Start time.Time
21+
Method string
22+
Req any
23+
Res any
24+
Err error
25+
}
26+
27+
type Monitor struct {
28+
duration metric.Int64Histogram
29+
requestSize metric.Int64Histogram
30+
responseSize metric.Int64Histogram
31+
attributes []attribute.KeyValue
32+
}
33+
34+
func NewOtelGRPCMonitor(hostName string) Monitor {
35+
meter := otel.Meter("github.com/raystack/meteor/metrics/otelgrpc")
36+
37+
duration, err := meter.Int64Histogram("rpc.client.duration", metric.WithUnit("ms"))
38+
handleOtelErr(err)
39+
40+
requestSize, err := meter.Int64Histogram("rpc.client.request.size", metric.WithUnit("By"))
41+
handleOtelErr(err)
42+
43+
responseSize, err := meter.Int64Histogram("rpc.client.response.size", metric.WithUnit("By"))
44+
handleOtelErr(err)
45+
46+
addr, port := ExtractAddress(hostName)
47+
48+
return Monitor{
49+
duration: duration,
50+
requestSize: requestSize,
51+
responseSize: responseSize,
52+
attributes: []attribute.KeyValue{
53+
semconv.RPCSystemGRPC,
54+
attribute.String("network.transport", "tcp"),
55+
attribute.String("server.address", addr),
56+
attribute.String("server.port", port),
57+
},
58+
}
59+
}
60+
61+
func GetProtoSize(p any) int {
62+
if p == nil {
63+
return 0
64+
}
65+
66+
size := proto.Size(p.(proto.Message))
67+
return size
68+
}
69+
70+
func (m *Monitor) RecordUnary(ctx context.Context, p UnaryParams) {
71+
reqSize := GetProtoSize(p.Req)
72+
resSize := GetProtoSize(p.Res)
73+
74+
attrs := make([]attribute.KeyValue, len(m.attributes))
75+
copy(attrs, m.attributes)
76+
attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(p.Err)))
77+
attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx)))
78+
attrs = append(attrs, ParseFullMethod(p.Method)...)
79+
80+
m.duration.Record(ctx,
81+
time.Since(p.Start).Milliseconds(),
82+
metric.WithAttributes(attrs...))
83+
84+
m.requestSize.Record(ctx,
85+
int64(reqSize),
86+
metric.WithAttributes(attrs...))
87+
88+
m.responseSize.Record(ctx,
89+
int64(resSize),
90+
metric.WithAttributes(attrs...))
91+
}
92+
93+
func (m *Monitor) RecordStream(ctx context.Context, start time.Time, method string, err error) {
94+
attrs := make([]attribute.KeyValue, len(m.attributes))
95+
copy(attrs, m.attributes)
96+
attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(err)))
97+
attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx)))
98+
attrs = append(attrs, ParseFullMethod(method)...)
99+
100+
m.duration.Record(ctx,
101+
time.Since(start).Milliseconds(),
102+
metric.WithAttributes(attrs...))
103+
}
104+
105+
func (m *Monitor) UnaryClientInterceptor() grpc.UnaryClientInterceptor {
106+
return func(ctx context.Context,
107+
method string,
108+
req, reply interface{},
109+
cc *grpc.ClientConn,
110+
invoker grpc.UnaryInvoker,
111+
opts ...grpc.CallOption,
112+
) (err error) {
113+
defer func(start time.Time) {
114+
m.RecordUnary(ctx, UnaryParams{
115+
Start: start,
116+
Req: req,
117+
Res: reply,
118+
Err: err,
119+
})
120+
}(time.Now())
121+
122+
return invoker(ctx, method, req, reply, cc, opts...)
123+
}
124+
}
125+
126+
func (m *Monitor) StreamClientInterceptor() grpc.StreamClientInterceptor {
127+
return func(ctx context.Context,
128+
desc *grpc.StreamDesc,
129+
cc *grpc.ClientConn,
130+
method string,
131+
streamer grpc.Streamer,
132+
opts ...grpc.CallOption,
133+
) (s grpc.ClientStream, err error) {
134+
defer func(start time.Time) {
135+
m.RecordStream(ctx, start, method, err)
136+
}(time.Now())
137+
138+
return streamer(ctx, desc, cc, method, opts...)
139+
}
140+
}
141+
142+
func (m *Monitor) GetAttributes() []attribute.KeyValue {
143+
return m.attributes
144+
}
145+
146+
func ParseFullMethod(fullMethod string) []attribute.KeyValue {
147+
name := strings.TrimLeft(fullMethod, "/")
148+
service, method, found := strings.Cut(name, "/")
149+
if !found {
150+
return nil
151+
}
152+
153+
var attrs []attribute.KeyValue
154+
if service != "" {
155+
attrs = append(attrs, semconv.RPCService(service))
156+
}
157+
if method != "" {
158+
attrs = append(attrs, semconv.RPCMethod(method))
159+
}
160+
return attrs
161+
}
162+
163+
func handleOtelErr(err error) {
164+
if err != nil {
165+
otel.Handle(err)
166+
}
167+
}
168+
169+
func ExtractAddress(addr string) (host, port string) {
170+
host, port, err := net.SplitHostPort(addr)
171+
if err != nil {
172+
return addr, "80"
173+
}
174+
175+
return host, port
176+
}
177+
178+
func netTypeFromCtx(ctx context.Context) (ipType string) {
179+
ipType = "unknown"
180+
p, ok := peer.FromContext(ctx)
181+
if !ok {
182+
return ipType
183+
}
184+
185+
clientIP, _, err := net.SplitHostPort(p.Addr.String())
186+
if err != nil {
187+
return ipType
188+
}
189+
190+
ip := net.ParseIP(clientIP)
191+
if ip.To4() != nil {
192+
ipType = "ipv4"
193+
} else if ip.To16() != nil {
194+
ipType = "ipv6"
195+
}
196+
197+
return ipType
198+
}

metrics/otelgrpc/otelgrpc_test.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package otelgrpc_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"reflect"
7+
"testing"
8+
"time"
9+
10+
"github.com/raystack/meteor/metrics/otelgrpc"
11+
pb "github.com/raystack/optimus/protos/raystack/optimus/core/v1beta1"
12+
"github.com/stretchr/testify/assert"
13+
"go.opentelemetry.io/otel/attribute"
14+
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
15+
)
16+
17+
func Test_otelGRPCMonitor_Record(t *testing.T) {
18+
mt := otelgrpc.NewOtelGRPCMonitor("localhost:1001")
19+
assert.NotNil(t, mt)
20+
initialAttr := mt.GetAttributes()
21+
22+
uc := mt.UnaryClientInterceptor()
23+
assert.NotNil(t, uc)
24+
assert.Equal(t, initialAttr, mt.GetAttributes())
25+
26+
sc := mt.StreamClientInterceptor()
27+
assert.NotNil(t, sc)
28+
assert.Equal(t, initialAttr, mt.GetAttributes())
29+
30+
mt.RecordUnary(context.Background(), otelgrpc.UnaryParams{
31+
Start: time.Now(),
32+
Method: "/service.raystack.com/MethodName",
33+
Req: nil,
34+
Res: nil,
35+
Err: nil,
36+
})
37+
assert.Equal(t, initialAttr, mt.GetAttributes())
38+
39+
mt.RecordUnary(context.Background(), otelgrpc.UnaryParams{
40+
Start: time.Now(),
41+
Method: "",
42+
Req: &pb.ListProjectsRequest{},
43+
Res: nil,
44+
Err: nil,
45+
})
46+
assert.Equal(t, initialAttr, mt.GetAttributes())
47+
48+
mt.RecordStream(context.Background(), time.Now(), "", nil)
49+
assert.Equal(t, initialAttr, mt.GetAttributes())
50+
51+
mt.RecordStream(context.Background(), time.Now(), "/service.raystack.com/MethodName", errors.New("dummy error"))
52+
assert.Equal(t, initialAttr, mt.GetAttributes())
53+
}
54+
55+
func Test_parseFullMethod(t *testing.T) {
56+
type args struct {
57+
fullMethod string
58+
}
59+
tests := []struct {
60+
name string
61+
args args
62+
want []attribute.KeyValue
63+
}{
64+
{name: "should parse correct method", args: args{
65+
fullMethod: "/test.service.name/MethodNameV1",
66+
}, want: []attribute.KeyValue{
67+
semconv.RPCService("test.service.name"),
68+
semconv.RPCMethod("MethodNameV1"),
69+
}},
70+
71+
{name: "should return empty attributes on incorrect method", args: args{
72+
fullMethod: "incorrectMethod",
73+
}, want: nil},
74+
}
75+
for _, tt := range tests {
76+
t.Run(tt.name, func(t *testing.T) {
77+
if got := otelgrpc.ParseFullMethod(tt.args.fullMethod); !reflect.DeepEqual(got, tt.want) {
78+
t.Errorf("parseFullMethod() = %v, want %v", got, tt.want)
79+
}
80+
})
81+
}
82+
}
83+
84+
func Test_getProtoSize(t *testing.T) {
85+
req := &pb.ListProjectNamespacesRequest{
86+
ProjectName: "asd",
87+
}
88+
89+
if got := otelgrpc.GetProtoSize(req); got != 5 {
90+
t.Errorf("getProtoSize() = %v, want %v", got, 5)
91+
}
92+
}
93+
94+
func TestExtractAddress(t *testing.T) {
95+
gotHost, gotPort := otelgrpc.ExtractAddress("localhost:1001")
96+
assert.Equal(t, "localhost", gotHost)
97+
assert.Equal(t, "1001", gotPort)
98+
99+
gotHost, gotPort = otelgrpc.ExtractAddress("localhost")
100+
assert.Equal(t, "localhost", gotHost)
101+
assert.Equal(t, "80", gotPort)
102+
103+
gotHost, gotPort = otelgrpc.ExtractAddress("some.address.golabs.io:15010")
104+
assert.Equal(t, "some.address.golabs.io", gotHost)
105+
assert.Equal(t, "15010", gotPort)
106+
}

0 commit comments

Comments
 (0)