Skip to content

Commit

Permalink
Merge pull request #22 from samcm/feat/peers
Browse files Browse the repository at this point in the history
feat(consensus): Add Peers metrics
  • Loading branch information
samcm authored Jun 7, 2022
2 parents 54fc17d + 973a89f commit b22e3b0
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 15 deletions.
143 changes: 143 additions & 0 deletions pkg/exporter/consensus/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package api

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types"
"github.com/sirupsen/logrus"
)

// ConsensusClient is an interface for executing RPC calls to the Ethereum node.
type ConsensusClient interface {
NodePeer(ctx context.Context, peerID string) (types.Peer, error)
NodePeers(ctx context.Context) (types.Peers, error)
NodePeerCount(ctx context.Context) (types.PeerCount, error)
}

type consensusClient struct {
url string
log logrus.FieldLogger
client http.Client
}

// NewConsensusClient creates a new ConsensusClient.
func NewConsensusClient(ctx context.Context, log logrus.FieldLogger, url string) ConsensusClient {
client := http.Client{
Timeout: time.Second * 10,
}

return &consensusClient{
url: url,
log: log,
client: client,
}
}

type apiResponse struct {
Data json.RawMessage `json:"data"`
}

//nolint:unparam // ctx will probably be used in the future
func (c *consensusClient) post(ctx context.Context, path string, body map[string]interface{}) (json.RawMessage, error) {
jsonData, err := json.Marshal(body)
if err != nil {
return nil, err
}

rsp, err := c.client.Post(c.url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}

defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d", rsp.StatusCode)
}

data, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

resp := new(apiResponse)
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}

return resp.Data, nil
}

//nolint:unparam // ctx will probably be used in the future
func (c *consensusClient) get(ctx context.Context, path string) (json.RawMessage, error) {
rsp, err := c.client.Get(c.url + path)
if err != nil {
return nil, err
}

defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d", rsp.StatusCode)
}

data, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

resp := new(apiResponse)
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}

return resp.Data, nil
}

func (c *consensusClient) NodePeers(ctx context.Context) (types.Peers, error) {
data, err := c.get(ctx, "/eth/v1/node/peers")
if err != nil {
return nil, err
}

rsp := types.Peers{}
if err := json.Unmarshal(data, &rsp); err != nil {
return nil, err
}

return rsp, nil
}

func (c *consensusClient) NodePeer(ctx context.Context, peerID string) (types.Peer, error) {
data, err := c.get(ctx, fmt.Sprintf("/eth/v1/node/peers/%s", peerID))
if err != nil {
return types.Peer{}, err
}

rsp := types.Peer{}
if err := json.Unmarshal(data, &rsp); err != nil {
return types.Peer{}, err
}

return rsp, nil
}

func (c *consensusClient) NodePeerCount(ctx context.Context) (types.PeerCount, error) {
data, err := c.get(ctx, "/eth/v1/node/peer_count")
if err != nil {
return types.PeerCount{}, err
}

rsp := types.PeerCount{}
if err := json.Unmarshal(data, &rsp); err != nil {
return types.PeerCount{}, err
}

return rsp, nil
}
66 changes: 66 additions & 0 deletions pkg/exporter/consensus/api/types/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package types

var PeerStates = []string{
"disconnected",
"connected",
"connecting",
"disconnecting",
}

var PeerDirections = []string{
"inbound",
"outbound",
}

type Peer struct {
PeerID string `json:"peer_id"`
ENR string `json:"enr"`
LastSeenP2PAddress string `json:"last_seen_p2p_address"`
State string `json:"state"`
Direction string `json:"direction"`
}

type Peers []Peer

type PeerCount struct {
Disconnected string `json:"disconnected"`
Connected string `json:"connected"`
Connecting string `json:"connecting"`
Disconnecting string `json:"disconnecting"`
}

func (p *Peers) ByState(state string) Peers {
var peers []Peer

for _, peer := range *p {
if peer.State == state {
peers = append(peers, peer)
}
}

return peers
}

func (p *Peers) ByDirection(direction string) Peers {
var peers []Peer

for _, peer := range *p {
if peer.Direction == direction {
peers = append(peers, peer)
}
}

return peers
}

func (p *Peers) ByStateAndDirection(state, direction string) Peers {
var peers []Peer

for _, peer := range *p {
if peer.State == state && peer.Direction == direction {
peers = append(peers, peer)
}
}

return peers
}
7 changes: 5 additions & 2 deletions pkg/exporter/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/http"
"github.com/rs/zerolog"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/sirupsen/logrus"
)

Expand All @@ -29,6 +30,7 @@ type node struct {
url string
namespace string
client eth2client.Service
api api.ConsensusClient
log logrus.FieldLogger
metrics Metrics
}
Expand All @@ -54,13 +56,14 @@ func (c *node) URL() string {
func (c *node) Bootstrap(ctx context.Context) error {
client, err := http.New(ctx,
http.WithAddress(c.url),
http.WithLogLevel(zerolog.WarnLevel),
http.WithLogLevel(zerolog.Disabled),
)
if err != nil {
return err
}

c.client = client
c.api = api.NewConsensusClient(ctx, c.log, c.url)

return nil
}
Expand All @@ -79,6 +82,6 @@ func (c *node) StartMetrics(ctx context.Context) {
time.Sleep(5 * time.Second)
}

c.metrics = NewMetrics(c.client, c.log, c.name, c.namespace)
c.metrics = NewMetrics(c.client, c.api, c.log, c.name, c.namespace)
c.metrics.StartAsync(ctx)
}
3 changes: 2 additions & 1 deletion pkg/exporter/consensus/jobs/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/sirupsen/logrus"
)

Expand All @@ -34,7 +35,7 @@ const (
)

// NewBeacon creates a new Beacon instance.
func NewBeaconJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon {
func NewBeaconJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon {
constLabels["module"] = NameBeacon
namespace += "_beacon"

Expand Down
3 changes: 2 additions & 1 deletion pkg/exporter/consensus/jobs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/sirupsen/logrus"
)

Expand All @@ -20,7 +21,7 @@ const (
)

// NewEvent creates a new Event instance.
func NewEventJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event {
func NewEventJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event {
constLabels["module"] = NameEvent
namespace += "_event"

Expand Down
3 changes: 2 additions & 1 deletion pkg/exporter/consensus/jobs/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/sirupsen/logrus"
"github.com/spf13/cast"

Expand All @@ -29,7 +30,7 @@ const (
)

// NewForksJob returns a new Forks instance.
func NewForksJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Forks {
func NewForksJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) Forks {
constLabels["module"] = NameFork

namespace += "_fork"
Expand Down
40 changes: 39 additions & 1 deletion pkg/exporter/consensus/jobs/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,33 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types"
"github.com/sirupsen/logrus"
)

// General reports general information about the node.
type General struct {
client eth2client.Service
api api.ConsensusClient
log logrus.FieldLogger
Slots prometheus.GaugeVec
NodeVersion prometheus.GaugeVec
ClientName prometheus.GaugeVec
Peers prometheus.GaugeVec
}

const (
NameGeneral = "general"
)

// NewGeneral creates a new General instance.
func NewGeneralJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) General {
func NewGeneralJob(client eth2client.Service, ap api.ConsensusClient, log logrus.FieldLogger, namespace string, constLabels map[string]string) General {
constLabels["module"] = NameGeneral

return General{
client: client,
api: ap,
log: log,
Slots: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand All @@ -53,6 +58,18 @@ func NewGeneralJob(client eth2client.Service, log logrus.FieldLogger, namespace
"version",
},
),
Peers: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "peers",
Help: "The count of peers connected to beacon node.",
ConstLabels: constLabels,
},
[]string{
"state",
"direction",
},
),
}
}

Expand Down Expand Up @@ -89,6 +106,10 @@ func (g *General) tick(ctx context.Context) {
g.log.WithError(err).Error("Failed to get beacon slot: ", checkpoint)
}
}

if err := g.GetPeers(ctx); err != nil {
g.log.WithError(err).Error("Failed to get peers")
}
}

func (g *General) GetNodeVersion(ctx context.Context) error {
Expand Down Expand Up @@ -136,6 +157,23 @@ func (g *General) GetBeaconSlot(ctx context.Context, identifier string) error {
return nil
}

func (g *General) GetPeers(ctx context.Context) error {
peers, err := g.api.NodePeers(ctx)
if err != nil {
return err
}

g.Peers.Reset()

for _, state := range types.PeerStates {
for _, direction := range types.PeerDirections {
g.Peers.WithLabelValues(state, direction).Set(float64(len(peers.ByStateAndDirection(state, direction))))
}
}

return nil
}

func (g *General) ObserveSlot(identifier string, slot uint64) {
g.Slots.WithLabelValues(identifier).Set(float64(slot))
}
Loading

0 comments on commit b22e3b0

Please sign in to comment.