Skip to content

Commit

Permalink
feat_: add ability to process prometheus metrics in telemetry client
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavlin committed Aug 29, 2024
1 parent 9b9a91f commit d10769e
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/status-cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func createAccountAndLogin(b *api.GethStatusBackend, rootDataDir, password strin
HTTPHost: "127.0.0.1",
HTTPPort: p.Port,
},
TelemetryServerURL: p.TelemetryURL,
TelemetryServerURL: p.TelemetryURL,
WakuV2EnableMissingMessageVerification: true,
}
return b.CreateAccountAndLogin(req,
params.WithFleet(p.Fleet),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
github.com/andybalholm/brotli v1.0.5
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/go-auxiliaries/shrinking-map v0.3.0
github.com/gorilla/sessions v1.2.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/jellydator/ttlcache/v3 v3.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20210130063903-47dfef350d96/go.mod
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
github.com/glycerine/goconvey v0.0.0-20190315024820-982ee783a72e/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
github.com/go-auxiliaries/shrinking-map v0.3.0 h1:kXiLmFY4y2s35WtOYAb02LRZ92IRnfzio+3prZn6ULs=
github.com/go-auxiliaries/shrinking-map v0.3.0/go.mod h1:UtBmTTKuUfI8wkhzaZ7G/xgHjxGxLwM2a6kf+aWmSmc=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
Expand Down
60 changes: 60 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"
Expand Down Expand Up @@ -92,6 +93,11 @@ type PeerConnFailure struct {
FailureCount int
}

type PrometheusMetricWrapper struct {
Typ TelemetryType
Data *json.RawMessage
}

type Client struct {
serverURL string
httpClient *http.Client
Expand All @@ -109,6 +115,8 @@ type Client struct {
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int

promMetrics *PrometheusMetrics
}

type TelemetryClientOption func(*Client)
Expand Down Expand Up @@ -143,8 +151,13 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
sendPeriod: 10 * time.Second, // default value
lastPeerCount: 0,
lastPeerConnFailures: make(map[string]int),

promMetrics: NewPrometheusMetrics(),
}

//client.promMetrics.Register("waku_connected_peers", GaugeType, nil, nil)
client.promMetrics.Register("waku2_envelopes_validated_total", CounterType, prometheus.Labels{"type": "relay"}, client.ProcessReuglarStoryRetrievedMsgs)

for _, opt := range opts {
opt(client)
}
Expand Down Expand Up @@ -180,6 +193,8 @@ func (c *Client) Start(ctx context.Context) {
c.telemetryCacheLock.Unlock()

if len(telemetryRequests) > 0 {
d, _ := json.MarshalIndent(telemetryRequests, "", " ")
fmt.Println(string(d))
err := c.pushTelemetryRequest(telemetryRequests)
if err != nil {
if sendPeriod < 60*time.Second { //Stop the growing if the timer is > 60s to at least retry every minute
Expand All @@ -196,6 +211,21 @@ func (c *Client) Start(ctx context.Context) {
}

}()

go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Println("exit")
return
case <-ticker.C:
c.promMetrics.Snapshot()

}
}
}()
}

func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) {
Expand Down Expand Up @@ -237,6 +267,13 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
case PrometheusMetricWrapper:
pmd := data.(PrometheusMetricWrapper)
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: pmd.Typ,
TelemetryData: pmd.Data,
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -413,3 +450,26 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err))
}
}

func (c *Client) ProcessReuglarStoryRetrievedMsgs(data MetricPayload) {
fmt.Println(data)

postBody := map[string]interface{}{
"msgCount": data.Value,
"pubsubTopic": data.Labels["pubsubTopic"],
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}

telemtryData, err := json.Marshal(postBody)
if err != nil {
return
}

rawData := json.RawMessage(telemtryData)

c.processAndPushTelemetry(context.Background(), PrometheusMetricWrapper{Typ: "ReuglarStoryRetrievedMsgs", Data: &rawData})
}
123 changes: 123 additions & 0 deletions telemetry/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package telemetry

import (
"log"

"github.com/prometheus/client_golang/prometheus"
)

type MetricType int

const (
_ MetricType = iota
CounterType
GaugeType
)

type ToTelemetryRequest func(payload MetricPayload)

type MetricPayload struct {
Labels map[string]string
Name string
Value float64
}

type Metric struct {
typ MetricType
labels map[string]string
toTelemetryRequest ToTelemetryRequest
}

type PrometheusMetrics struct {
metrics map[string]Metric
}

func NewPrometheusMetrics() *PrometheusMetrics {
return &PrometheusMetrics{
metrics: make(map[string]Metric),
}
}

func (pm *PrometheusMetrics) Register(name string, typ MetricType, labels prometheus.Labels, toTelemetryRequest ToTelemetryRequest) {
pm.metrics[name] = Metric{typ, labels, toTelemetryRequest}
}

func (pm *PrometheusMetrics) Snapshot() {
gatherer := prometheus.DefaultGatherer
metrics, err := gatherer.Gather()
if err != nil {
log.Fatalf("Failed to gather metrics: %v", err)
}

for _, mf := range metrics {
metric, ok := pm.metrics[*mf.Name]
if !ok {
continue
}
if len(mf.GetMetric()) == 0 {
continue
}

for _, m := range mf.GetMetric() {
var p MetricPayload
if metric.labels != nil {
matchCnt := len(metric.labels)

for name, value := range metric.labels {
for _, label := range m.GetLabel() {
if name == *label.Name && value == *label.Value {
matchCnt--
}
}
}

if matchCnt > 0 {
continue
}
}

labelMap := make(map[string]string)

for _, l := range m.GetLabel() {
labelMap[*l.Name] = *l.Value
}

switch metric.typ {
case CounterType:

p = MetricPayload{
Name: *mf.Name,
Value: *m.Counter.Value,
Labels: labelMap,
}

/*data, err := json.MarshalIndent(map[string]interface{}{"name": *mf.Name, "value": m.Counter.Value, "labels": m.GetLabel()}, "", " ")
if err != nil {
fmt.Println("failed to marshal")
}
fmt.Println("Data: ", string(data))*/
case GaugeType:
p = MetricPayload{
Name: *mf.Name,
Value: *m.Gauge.Value,
Labels: labelMap,
}

/*fmt.Println(m.String())
data, err := json.MarshalIndent(map[string]interface{}{"name": *mf.Name, "value": m.Gauge.Value}, "", " ")
if err != nil {
fmt.Println("failed to marshal")
}
fmt.Println("Data: ", string(data))*/
}

metric.toTelemetryRequest(p)
}
}

}

func (pm *PrometheusMetrics) Get(name string) {

}
3 changes: 3 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ github.com/francoispqt/gojay
# github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08
## explicit
github.com/gballet/go-libpcsclite
# github.com/go-auxiliaries/shrinking-map v0.3.0
## explicit; go 1.18
github.com/go-auxiliaries/shrinking-map/pkg/shrinking-map
# github.com/go-ole/go-ole v1.2.6
## explicit; go 1.12
github.com/go-ole/go-ole
Expand Down
11 changes: 8 additions & 3 deletions wakuv2/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ var (
Name: "waku2_envelopes_received_total",
Help: "Number of envelopes received.",
})
EnvelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
EnvelopesValidatedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_validated_total",
Help: "Number of envelopes processed successfully.",
})
}, []string{"pubsubTopic", "type"})
EnvelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_rejected_total",
Help: "Number of envelopes rejected.",
Expand All @@ -48,12 +48,17 @@ var (
Help: "Size of processed Waku envelopes in bytes.",
Buckets: prom.ExponentialBuckets(256, 4, 10),
})
/*EnvelopeProcessed = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_processed_envelope",
Help: "Missing messages retrieved by regular store queries",
}, []string{"pubsubTopic", "type"})*/
)

func init() {
prom.MustRegister(EnvelopesReceivedCounter)
prom.MustRegister(EnvelopesRejectedCounter)
prom.MustRegister(EnvelopesValidatedCounter)
prom.MustRegister(EnvelopesCacheFailedCounter)
prom.MustRegister(EnvelopesCachedCounter)
prom.MustRegister(EnvelopesSizeMeter)
//prom.MustRegister(Pro)
}
8 changes: 6 additions & 2 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"

"go.uber.org/zap"

Expand Down Expand Up @@ -273,6 +274,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
node.WithLogLevel(logger.Level()),
node.WithClusterID(cfg.ClusterID),
node.WithMaxMsgSize(1024 * 1024),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
}

if cfg.EnableDiscV5 {
Expand Down Expand Up @@ -1104,10 +1106,11 @@ func (w *Waku) Start() error {
w.logger)

w.missingMsgVerifier.Start(w.ctx)
w.logger.Info("Started missing message verifier")

w.wg.Add(1)
go func() {
w.wg.Done()
defer w.wg.Done()
for {
select {
case <-w.ctx.Done():
Expand All @@ -1117,6 +1120,7 @@ func (w *Waku) Start() error {
if err != nil {
w.logger.Error("OnNewEnvelopes error", zap.Error(err))
}
w.logger.Info("Got a missing message!")
}
}
}()
Expand Down Expand Up @@ -1346,7 +1350,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
trouble = true
}

common.EnvelopesValidatedCounter.Inc()
common.EnvelopesValidatedCounter.With(prometheus.Labels{"pubsubTopic": envelope.PubsubTopic(), "type": msgType}).Inc()

if trouble {
return errors.New("received invalid envelope")
Expand Down

0 comments on commit d10769e

Please sign in to comment.