Skip to content

Commit 2ea25ad

Browse files
authored
Add publisher prometheus metrics (#154)
* adding metrics * adding metric inc to sender * removing entityName, we can add it if needed in the future
1 parent b1eea4a commit 2ea25ad

File tree

8 files changed

+452
-250
lines changed

8 files changed

+452
-250
lines changed

Diff for: v2/lockrenewer.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ import (
99

1010
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1111
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
12+
"github.com/Azure/go-shuttle/v2/metrics/processor"
1213
"go.opentelemetry.io/otel/attribute"
1314
"go.opentelemetry.io/otel/trace"
14-
15-
"github.com/Azure/go-shuttle/v2/metrics"
1615
)
1716

1817
// LockRenewer abstracts the servicebus receiver client to only expose lock renewal
@@ -125,7 +124,7 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
125124
err := plr.lockRenewer.RenewMessageLock(ctx, message, nil)
126125
if err != nil {
127126
log(ctx, fmt.Sprintf("failed to renew lock: %s", err))
128-
metrics.Processor.IncMessageLockRenewedFailure(message)
127+
processor.Metric.IncMessageLockRenewedFailure(message)
129128
// The context is canceled when the message handler returns from the processor.
130129
// This can happen if we already entered the interval case when the message processing completes.
131130
// The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors.
@@ -141,14 +140,14 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
141140
continue
142141
}
143142
span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count)))
144-
metrics.Processor.IncMessageLockRenewedSuccess(message)
143+
processor.Metric.IncMessageLockRenewedSuccess(message)
145144
case <-ctx.Done():
146145
log(ctx, "context done: stopping periodic renewal")
147146
span.AddEvent("context done: stopping message lock renewal")
148147
err := ctx.Err()
149148
if errors.Is(err, context.DeadlineExceeded) {
150149
span.RecordError(err)
151-
metrics.Processor.IncMessageDeadlineReachedCount(message)
150+
processor.Metric.IncMessageDeadlineReachedCount(message)
152151
}
153152
plr.stop(ctx)
154153
case <-plr.stopped:

Diff for: v2/metrics/processor/types.go

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package processor
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
7+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
8+
prom "github.com/prometheus/client_golang/prometheus"
9+
dto "github.com/prometheus/client_model/go"
10+
)
11+
12+
const (
13+
subsystem = "goshuttle_handler"
14+
messageTypeLabel = "messageType"
15+
deliveryCountLabel = "deliveryCount"
16+
successLabel = "success"
17+
)
18+
19+
var (
20+
metricsRegistry = newRegistry()
21+
// Processor exposes a Recorder interface to manipulate the Processor metrics.
22+
Metric Recorder = metricsRegistry
23+
)
24+
25+
func newRegistry() *Registry {
26+
return &Registry{
27+
MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{
28+
Name: "message_received_total",
29+
Help: "total number of messages received by the processor",
30+
Subsystem: subsystem,
31+
}, []string{}),
32+
MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{
33+
Name: "message_handled_total",
34+
Help: "total number of messages handled by this handler",
35+
Subsystem: subsystem,
36+
}, []string{messageTypeLabel, deliveryCountLabel}),
37+
MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{
38+
Name: "message_lock_renewed_total",
39+
Help: "total number of message lock renewal",
40+
Subsystem: subsystem,
41+
}, []string{messageTypeLabel, successLabel}),
42+
MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{
43+
Name: "message_deadline_reached_total",
44+
Help: "total number of message lock renewal",
45+
Subsystem: subsystem,
46+
}, []string{messageTypeLabel}),
47+
ConcurrentMessageCount: prom.NewGaugeVec(prom.GaugeOpts{
48+
Name: "concurrent_message_count",
49+
Help: "number of messages being handled concurrently",
50+
Subsystem: subsystem,
51+
}, []string{messageTypeLabel}),
52+
}
53+
}
54+
55+
func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels {
56+
typeName := msg.ApplicationProperties["type"]
57+
return map[string]string{
58+
messageTypeLabel: fmt.Sprintf("%s", typeName),
59+
}
60+
}
61+
62+
func (m *Registry) Init(reg prom.Registerer) {
63+
reg.MustRegister(
64+
m.MessageReceivedCount,
65+
m.MessageHandledCount,
66+
m.MessageLockRenewedCount,
67+
m.MessageDeadlineReachedCount,
68+
m.ConcurrentMessageCount)
69+
}
70+
71+
type Registry struct {
72+
MessageReceivedCount *prom.CounterVec
73+
MessageHandledCount *prom.CounterVec
74+
MessageLockRenewedCount *prom.CounterVec
75+
MessageDeadlineReachedCount *prom.CounterVec
76+
ConcurrentMessageCount *prom.GaugeVec
77+
}
78+
79+
// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime.
80+
type Recorder interface {
81+
Init(registerer prom.Registerer)
82+
IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage)
83+
IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage)
84+
IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage)
85+
DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
86+
IncMessageHandled(msg *azservicebus.ReceivedMessage)
87+
IncMessageReceived(float64)
88+
IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
89+
}
90+
91+
// IncMessageLockRenewedSuccess increase the message lock renewal success counter
92+
func (m *Registry) IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) {
93+
labels := getMessageTypeLabel(msg)
94+
labels[successLabel] = "true"
95+
m.MessageLockRenewedCount.With(labels).Inc()
96+
}
97+
98+
// IncMessageLockRenewedFailure increase the message lock renewal failure counter
99+
func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) {
100+
labels := getMessageTypeLabel(msg)
101+
labels[successLabel] = "false"
102+
m.MessageLockRenewedCount.With(labels).Inc()
103+
}
104+
105+
// IncMessageHandled increase the message Handled
106+
func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) {
107+
labels := getMessageTypeLabel(msg)
108+
labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10)
109+
m.MessageHandledCount.With(labels).Inc()
110+
}
111+
112+
// IncConcurrentMessageCount increases the concurrent message counter
113+
func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
114+
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc()
115+
}
116+
117+
// DecConcurrentMessageCount decreases the concurrent message counter
118+
func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
119+
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec()
120+
}
121+
122+
// IncMessageDeadlineReachedCount increases the message deadline reached counter
123+
func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) {
124+
labels := getMessageTypeLabel(msg)
125+
m.MessageDeadlineReachedCount.With(labels).Inc()
126+
}
127+
128+
// IncMessageReceived increases the message received counter
129+
func (m *Registry) IncMessageReceived(count float64) {
130+
m.MessageReceivedCount.With(map[string]string{}).Add(count)
131+
}
132+
133+
// Informer allows to inspect metrics value stored in the registry at runtime
134+
type Informer struct {
135+
registry *Registry
136+
}
137+
138+
// NewInformer creates an Informer for the current registry
139+
func NewInformer() *Informer {
140+
return &Informer{registry: metricsRegistry}
141+
}
142+
143+
// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric
144+
func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) {
145+
var total float64
146+
collect(i.registry.MessageLockRenewedCount, func(m dto.Metric) {
147+
if !hasLabel(m, successLabel, "false") {
148+
return
149+
}
150+
total += m.GetCounter().GetValue()
151+
})
152+
return total, nil
153+
}
154+
155+
func hasLabel(m dto.Metric, key string, value string) bool {
156+
for _, pair := range m.Label {
157+
if pair == nil {
158+
continue
159+
}
160+
if pair.GetName() == key && pair.GetValue() == value {
161+
return true
162+
}
163+
}
164+
return false
165+
}
166+
167+
// collect calls the function for each metric associated with the Collector
168+
func collect(col prom.Collector, do func(dto.Metric)) {
169+
c := make(chan prom.Metric)
170+
go func(c chan prom.Metric) {
171+
col.Collect(c)
172+
close(c)
173+
}(c)
174+
for x := range c { // eg range across distinct label vector values
175+
m := dto.Metric{}
176+
_ = x.Write(&m)
177+
do(m)
178+
}
179+
}

Diff for: v2/metrics/processor/types_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package processor
2+
3+
import (
4+
"testing"
5+
6+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
7+
. "github.com/onsi/gomega"
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
type fakeRegistry struct {
12+
collectors []prometheus.Collector
13+
}
14+
15+
func (f *fakeRegistry) Register(c prometheus.Collector) error {
16+
panic("implement me")
17+
}
18+
19+
func (f *fakeRegistry) MustRegister(c ...prometheus.Collector) {
20+
f.collectors = append(f.collectors, c...)
21+
}
22+
23+
func (f *fakeRegistry) Unregister(c prometheus.Collector) bool {
24+
panic("implement me")
25+
}
26+
27+
func TestRegistry_Init(t *testing.T) {
28+
g := NewWithT(t)
29+
r := newRegistry()
30+
fRegistry := &fakeRegistry{}
31+
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
32+
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
33+
g.Expect(fRegistry.collectors).To(HaveLen(5))
34+
Metric.IncMessageReceived(10)
35+
36+
}
37+
38+
func TestMetrics(t *testing.T) {
39+
type testcase struct {
40+
name string
41+
msg *azservicebus.ReceivedMessage
42+
}
43+
for _, tc := range []testcase{
44+
{
45+
name: "no type property",
46+
msg: &azservicebus.ReceivedMessage{},
47+
},
48+
{
49+
name: "with type property",
50+
msg: &azservicebus.ReceivedMessage{
51+
ApplicationProperties: map[string]interface{}{
52+
"type": "someType",
53+
},
54+
},
55+
},
56+
} {
57+
g := NewWithT(t)
58+
r := newRegistry()
59+
registerer := prometheus.NewRegistry()
60+
informer := &Informer{registry: r}
61+
62+
// before init
63+
count, err := informer.GetMessageLockRenewedFailureCount()
64+
g.Expect(err).ToNot(HaveOccurred())
65+
g.Expect(count).To(Equal(float64(0)))
66+
67+
// after init, count 0
68+
g.Expect(func() { r.Init(registerer) }).ToNot(Panic())
69+
count, err = informer.GetMessageLockRenewedFailureCount()
70+
g.Expect(err).ToNot(HaveOccurred())
71+
g.Expect(count).To(Equal(float64(0)))
72+
73+
// count incremented
74+
r.IncMessageLockRenewedFailure(tc.msg)
75+
count, err = informer.GetMessageLockRenewedFailureCount()
76+
g.Expect(err).ToNot(HaveOccurred())
77+
g.Expect(count).To(Equal(float64(1)))
78+
79+
// count failure only
80+
r.IncMessageLockRenewedSuccess(tc.msg)
81+
count, err = informer.GetMessageLockRenewedFailureCount()
82+
g.Expect(err).ToNot(HaveOccurred())
83+
g.Expect(count).To(Equal(float64(1)))
84+
}
85+
86+
}

0 commit comments

Comments
 (0)