Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry)_: track peers by shard and origin #5819

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"

v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"

v1protocol "github.com/status-im/status-go/protocol/v1"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type TelemetryType string
Expand All @@ -32,8 +32,9 @@ const (
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"

MaxRetryCache = 5000
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MaxRetryCache = 5000
)

type TelemetryRequest struct {
Expand Down Expand Up @@ -79,6 +80,18 @@ func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[
}
}

func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) {
for shard, count := range peerCountByShard {
c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count})
}
}

func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) {
for origin, count := range peerCountByOrigin {
c.processAndPushTelemetry(ctx, PeerCountByOrigin{Origin: origin, Count: count})
}
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand All @@ -94,6 +107,16 @@ type PeerConnFailure struct {
FailureCount int
}

type PeerCountByShard struct {
Shard uint16
Count uint
}

type PeerCountByOrigin struct {
Origin wps.Origin
Count uint
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -246,6 +269,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
case PeerCountByShard:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByShardMetric,
TelemetryData: c.ProcessPeerCountByShard(v),
}
case PeerCountByOrigin:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -383,6 +418,24 @@ func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.R
return &jsonRawMessage
}

func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage {
postBody := c.commonPostBody()
postBody["shard"] = peerCountByShard.Shard
postBody["count"] = peerCountByShard.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage {
postBody := c.commonPostBody()
postBody["origin"] = peerCountByOrigin.Origin
postBody["count"] = peerCountByOrigin.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
64 changes: 59 additions & 5 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type ITelemetryClient interface {
PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(ctx context.Context, peerCount int)
PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1092,6 +1094,22 @@ func (w *Waku) Start() error {
}
}()

if w.cfg.TelemetryServerURL != "" {
go func() {
peerTelemetryTicker := time.NewTicker(10 * time.Second)
defer peerTelemetryTicker.Stop()

for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
}
}
}()
}

go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
//TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented.
Expand Down Expand Up @@ -1187,16 +1205,52 @@ func (w *Waku) checkForConnectionChanges() {
w.onPeerStats(latestConnStatus)
}

w.ConnectionChanged(connection.State{
Type: w.state.Type, //setting state type as previous one since there won't be a change here
Offline: !latestConnStatus.IsOnline,
})
}

func (w *Waku) reportPeerMetrics() {
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures)
}

w.ConnectionChanged(connection.State{
Type: w.state.Type, //setting state type as previous one since there won't be a change here
Offline: !latestConnStatus.IsOnline,
})
peerCountByOrigin := make(map[wps.Origin]uint)
peerCountByShard := make(map[uint16]uint)
for _, peerID := range w.node.Host().Network().Peers() {
wakuPeerStore := w.node.Host().Peerstore().(wps.WakuPeerstore)

origin, err := wakuPeerStore.Origin(peerID)
if err != nil {
continue
}

peerCountByOrigin[origin]++
pubsubTopics, err := wakuPeerStore.PubSubTopics(peerID)
if err != nil {
continue
}

keys := make([]string, 0, len(pubsubTopics))
for k := range pubsubTopics {
keys = append(keys, k)
}
relayShards, err := protocol.TopicsToRelayShards(keys...)
if err != nil {
continue
}

for _, shards := range relayShards {
for _, shard := range shards.ShardIDs {
peerCountByShard[shard]++
}
}
}
w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard)
w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin)
}
}

func (w *Waku) startMessageSender() error {
Expand Down