Skip to content

Commit

Permalink
Merge pull request #20 from samcm/feat/events-and-beacon
Browse files Browse the repository at this point in the history
feat(consensus): Add Beacon and Event jobs
samcm authored Jun 6, 2022
2 parents 13d2818 + de3ae63 commit 54fc17d
Showing 9 changed files with 492 additions and 114 deletions.
282 changes: 282 additions & 0 deletions pkg/exporter/consensus/jobs/beacon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package jobs

import (
"context"
"errors"
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

// Beacon reports Beacon information about the beacon chain.
type Beacon struct {
client eth2client.Service
log logrus.FieldLogger
Slot prometheus.GaugeVec
Transactions prometheus.GaugeVec
Slashings prometheus.GaugeVec
Attestations prometheus.GaugeVec
Deposits prometheus.GaugeVec
VoluntaryExits prometheus.GaugeVec
FinalityCheckpoints prometheus.GaugeVec
ReOrgs prometheus.Counter
ReOrgDepth prometheus.Counter

currentVersion string
}

const (
NameBeacon = "beacon"
)

// NewBeacon creates a new Beacon instance.
func NewBeaconJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon {
constLabels["module"] = NameBeacon
namespace += "_beacon"

return Beacon{
client: client,
log: log,
Slot: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "slot",
Help: "The slot number in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
},
),
Transactions: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "transactions",
Help: "The amount of transactions in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
},
),
Slashings: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "slashings",
Help: "The amount of slashings in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
"type",
},
),
Attestations: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "attestations",
Help: "The amount of attestations in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
},
),
Deposits: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "deposits",
Help: "The amount of deposits in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
},
),
VoluntaryExits: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "voluntary_exits",
Help: "The amount of voluntary exits in the block.",
ConstLabels: constLabels,
},
[]string{
"block_id",
"version",
},
),
FinalityCheckpoints: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "finality_checkpoint_epochs",
Help: "That epochs of the finality checkpoints.",
ConstLabels: constLabels,
},
[]string{
"state_id",
"checkpoint",
},
),
ReOrgs: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "reorg_count",
Help: "The count of reorgs.",
ConstLabels: constLabels,
},
),
ReOrgDepth: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "reorg_depth",
Help: "The number of reorgs.",
ConstLabels: constLabels,
},
),
}
}

func (b *Beacon) Name() string {
return NameBeacon
}

func (b *Beacon) Start(ctx context.Context) {
b.tick(ctx)

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 5):
b.tick(ctx)
}
}
}

func (b *Beacon) tick(ctx context.Context) {
for _, id := range []string{"head", "finalized", "justified"} {
if id != "justified" {
if err := b.GetFinality(ctx, id); err != nil {
b.log.WithError(err).Error("Failed to get finality")
}
}

if err := b.GetSignedBeaconBlock(ctx, id); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}
}
}

func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) {
if event.Topic == EventTopicBlock {
if err := b.GetSignedBeaconBlock(ctx, "head"); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}
}

if event.Topic == EventTopicChainReorg {
b.handleChainReorg(event)
}
}

func (b *Beacon) handleChainReorg(event *v1.Event) {
reorg, ok := event.Data.(*v1.ChainReorgEvent)
if !ok {
return
}

b.ReOrgs.Inc()
b.ReOrgDepth.Add(float64(reorg.Depth))
}

func (b *Beacon) GetSignedBeaconBlock(ctx context.Context, blockID string) error {
provider, isProvider := b.client.(eth2client.SignedBeaconBlockProvider)
if !isProvider {
return errors.New("client does not implement eth2client.SignedBeaconBlockProvider")
}

signedBeaconBlock, err := provider.SignedBeaconBlock(ctx, blockID)
if err != nil {
return err
}

if err := b.handleSingleBlock(blockID, signedBeaconBlock); err != nil {
return err
}

return nil
}

func (b *Beacon) GetFinality(ctx context.Context, stateID string) error {
provider, isProvider := b.client.(eth2client.FinalityProvider)
if !isProvider {
return errors.New("client does not implement eth2client.FinalityProvider")
}

finality, err := provider.Finality(ctx, stateID)
if err != nil {
return err
}

b.FinalityCheckpoints.
WithLabelValues(stateID, "previous_justified").
Set(float64(finality.PreviousJustified.Epoch))

b.FinalityCheckpoints.
WithLabelValues(stateID, "justified").
Set(float64(finality.Justified.Epoch))

b.FinalityCheckpoints.
WithLabelValues(stateID, "finalized").
Set(float64(finality.Finalized.Epoch))

return nil
}

func (b *Beacon) handleSingleBlock(blockID string, block *spec.VersionedSignedBeaconBlock) error {
if b.currentVersion != block.Version.String() {
b.Transactions.Reset()
b.Slashings.Reset()
b.Attestations.Reset()
b.Deposits.Reset()
b.VoluntaryExits.Reset()

b.currentVersion = block.Version.String()
}

var beaconBlock BeaconBlock

switch block.Version {
case spec.DataVersionPhase0:
beaconBlock = NewBeaconBlockFromPhase0(block)
case spec.DataVersionAltair:
beaconBlock = NewBeaconBlockFromAltair(block)
case spec.DataVersionBellatrix:
beaconBlock = NewBeaconBlockFromBellatrix(block)
default:
return errors.New("received beacon block of unknown spec version")
}

b.recordNewBeaconBlock(blockID, block.Version.String(), beaconBlock)

return nil
}

func (b *Beacon) recordNewBeaconBlock(blockID, version string, block BeaconBlock) {
b.Slot.WithLabelValues(blockID, version).Set(float64(block.Slot))
b.Slashings.WithLabelValues(blockID, version, "proposer").Set(float64(block.ProposerSlashings))
b.Slashings.WithLabelValues(blockID, version, "attester").Set(float64(block.ProposerSlashings))
b.Attestations.WithLabelValues(blockID, version).Set(float64(block.Attestations))
b.Deposits.WithLabelValues(blockID, version).Set(float64(block.Deposits))
b.VoluntaryExits.WithLabelValues(blockID, version).Set(float64(block.VoluntaryExits))
b.Transactions.WithLabelValues(blockID, version).Set(float64(block.Transactions))
}
51 changes: 51 additions & 0 deletions pkg/exporter/consensus/jobs/beacon_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package jobs

import (
"github.com/attestantio/go-eth2-client/spec"
)

type BeaconBlock struct {
AttesterSlashings int
ProposerSlashings int
Transactions int
Deposits int
VoluntaryExits int
Attestations int
Slot uint64
}

func NewBeaconBlockFromPhase0(block *spec.VersionedSignedBeaconBlock) BeaconBlock {
return BeaconBlock{
AttesterSlashings: len(block.Phase0.Message.Body.AttesterSlashings),
ProposerSlashings: len(block.Phase0.Message.Body.ProposerSlashings),
Transactions: 0,
Deposits: len(block.Phase0.Message.Body.Deposits),
VoluntaryExits: len(block.Phase0.Message.Body.VoluntaryExits),
Attestations: len(block.Phase0.Message.Body.Attestations),
Slot: uint64(block.Phase0.Message.Slot),
}
}

func NewBeaconBlockFromAltair(block *spec.VersionedSignedBeaconBlock) BeaconBlock {
return BeaconBlock{
AttesterSlashings: len(block.Altair.Message.Body.AttesterSlashings),
ProposerSlashings: len(block.Altair.Message.Body.ProposerSlashings),
Transactions: 0,
Deposits: len(block.Altair.Message.Body.Deposits),
VoluntaryExits: len(block.Altair.Message.Body.VoluntaryExits),
Attestations: len(block.Altair.Message.Body.Attestations),
Slot: uint64(block.Altair.Message.Slot),
}
}

func NewBeaconBlockFromBellatrix(block *spec.VersionedSignedBeaconBlock) BeaconBlock {
return BeaconBlock{
AttesterSlashings: len(block.Bellatrix.Message.Body.AttesterSlashings),
ProposerSlashings: len(block.Bellatrix.Message.Body.ProposerSlashings),
Transactions: len(block.Bellatrix.Message.Body.ExecutionPayload.Transactions),
Deposits: len(block.Bellatrix.Message.Body.Deposits),
VoluntaryExits: len(block.Bellatrix.Message.Body.VoluntaryExits),
Attestations: len(block.Bellatrix.Message.Body.Attestations),
Slot: uint64(block.Bellatrix.Message.Slot),
}
}
51 changes: 51 additions & 0 deletions pkg/exporter/consensus/jobs/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package jobs

import (
"context"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

// Event reports event counts.
type Event struct {
log logrus.FieldLogger
Count prometheus.CounterVec
}

const (
NameEvent = "event"
)

// NewEvent creates a new Event instance.
func NewEventJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event {
constLabels["module"] = NameEvent
namespace += "_event"

return Event{
log: log,
Count: *prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "count",
Help: "The count of beacon events.",
ConstLabels: constLabels,
},
[]string{
"name",
},
),
}
}

func (b *Event) Name() string {
return NameEvent
}

func (b *Event) Start(ctx context.Context) {}

func (b *Event) HandleEvent(ctx context.Context, event *v1.Event) {
b.Count.WithLabelValues(event.Topic).Inc()
}
4 changes: 4 additions & 0 deletions pkg/exporter/consensus/jobs/forks.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/spf13/cast"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
)

// Forks reports the state of any forks (previous, active or upcoming).
@@ -101,6 +102,9 @@ func (f *Forks) tick(ctx context.Context) {
}
}

func (f *Forks) HandleEvent(ctx context.Context, event *v1.Event) {
}

func (f *Forks) ForkEpochs(ctx context.Context) error {
spec, err := f.getSpec(ctx)
if err != nil {
118 changes: 7 additions & 111 deletions pkg/exporter/consensus/jobs/general.go
Original file line number Diff line number Diff line change
@@ -13,13 +13,11 @@ import (

// General reports general information about the node.
type General struct {
client eth2client.Service
log logrus.FieldLogger
Slots prometheus.GaugeVec
NodeVersion prometheus.GaugeVec
ReOrgs prometheus.Counter
ReOrgDepth prometheus.Counter
FinalityCheckpoints prometheus.GaugeVec
client eth2client.Service
log logrus.FieldLogger
Slots prometheus.GaugeVec
NodeVersion prometheus.GaugeVec
ClientName prometheus.GaugeVec
}

const (
@@ -55,34 +53,6 @@ func NewGeneralJob(client eth2client.Service, log logrus.FieldLogger, namespace
"version",
},
),
ReOrgs: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "reorg_count",
Help: "The count of reorgs.",
ConstLabels: constLabels,
},
),
ReOrgDepth: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "reorg_depth",
Help: "The number of reorgs.",
ConstLabels: constLabels,
},
),
FinalityCheckpoints: *prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "finality_checkpoint_epochs",
Help: "That epochs of the finality checkpoints.",
ConstLabels: constLabels,
},
[]string{
"state_id",
"checkpoint",
},
),
}
}

@@ -93,63 +63,18 @@ func (g *General) Name() string {
func (g *General) Start(ctx context.Context) {
g.tick(ctx)

subscribed := false

if err := g.startSubscriptions(ctx); err == nil {
subscribed = true
}

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 15):
g.tick(ctx)

if !subscribed {
if err := g.startSubscriptions(ctx); err == nil {
subscribed = true
}
}
}
}
}

func (g *General) startSubscriptions(ctx context.Context) error {
g.log.Info("starting subscriptions")

provider, isProvider := g.client.(eth2client.EventsProvider)
if !isProvider {
return errors.New("client does not implement eth2client.Subscriptions")
}

topics := []string{
"chain_reorg",
}

if err := provider.Events(ctx, topics, g.handleEvent); err != nil {
return err
}

return nil
}

func (g *General) handleEvent(event *v1.Event) {
//nolint:gocritic // new subscription topics coming soon
switch event.Topic {
case "chain_reorg":
g.handleChainReorg(event)
}
}

func (g *General) handleChainReorg(event *v1.Event) {
reorg, ok := event.Data.(*v1.ChainReorgEvent)
if !ok {
return
}
func (g *General) HandleEvent(ctx context.Context, event *v1.Event) {

g.ReOrgs.Inc()
g.ReOrgDepth.Add(float64(reorg.Depth))
}

func (g *General) tick(ctx context.Context) {
@@ -163,10 +88,6 @@ func (g *General) tick(ctx context.Context) {
if err := g.GetBeaconSlot(ctx, checkpoint); err != nil {
g.log.WithError(err).Error("Failed to get beacon slot: ", checkpoint)
}

if err := g.GetFinality(ctx, checkpoint); err != nil {
g.log.WithError(err).Error("Failed to get finality checkpoint: ", checkpoint)
}
}
}

@@ -181,6 +102,7 @@ func (g *General) GetNodeVersion(ctx context.Context) error {
return err
}

g.NodeVersion.Reset()
g.NodeVersion.WithLabelValues(version).Set(1)

return nil
@@ -217,29 +139,3 @@ func (g *General) GetBeaconSlot(ctx context.Context, identifier string) error {
func (g *General) ObserveSlot(identifier string, slot uint64) {
g.Slots.WithLabelValues(identifier).Set(float64(slot))
}

func (g *General) GetFinality(ctx context.Context, stateID string) error {
provider, isProvider := g.client.(eth2client.FinalityProvider)
if !isProvider {
return errors.New("client does not implement eth2client.FinalityProvider")
}

finality, err := provider.Finality(ctx, stateID)
if err != nil {
return err
}

g.FinalityCheckpoints.
WithLabelValues(stateID, "previous_justified").
Set(float64(finality.PreviousJustified.Epoch))

g.FinalityCheckpoints.
WithLabelValues(stateID, "justified").
Set(float64(finality.Justified.Epoch))

g.FinalityCheckpoints.
WithLabelValues(stateID, "finalized").
Set(float64(finality.Finalized.Epoch))

return nil
}
5 changes: 5 additions & 0 deletions pkg/exporter/consensus/jobs/spec.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/spf13/cast"
@@ -276,6 +277,10 @@ func (s *Spec) tick(ctx context.Context) {
}
}

func (s *Spec) HandleEvent(ctx context.Context, event *v1.Event) {

}

func (s *Spec) GetSpec(ctx context.Context) error {
provider, isProvider := s.client.(eth2client.SpecProvider)
if !isProvider {
4 changes: 4 additions & 0 deletions pkg/exporter/consensus/jobs/syncstatus.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@@ -94,6 +95,9 @@ func (s *Sync) Start(ctx context.Context) {
}
}

func (s *Sync) HandleEvent(ctx context.Context, event *v1.Event) {
}

func (s *Sync) tick(ctx context.Context) {
if err := s.GetSyncState(ctx); err != nil {
s.log.WithError(err).Error("failed to get sync state")
13 changes: 13 additions & 0 deletions pkg/exporter/consensus/jobs/topics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package jobs

type EventTopic string

const (
EventTopicBlock = "block"
EventTopicHead = "head"
EventTopicAttestation = "attestation"
EventTopicChainReorg = "chain_reorg"
EventTopicFinalizedCheckpoint = "finalized_checkpoint"
EventTopicVoluntaryExit = "voluntary_exit"
EventTopicContributionAndProof = "contribution_and_proof"
)
78 changes: 75 additions & 3 deletions pkg/exporter/consensus/metrics.go
Original file line number Diff line number Diff line change
@@ -2,8 +2,11 @@ package consensus

import (
"context"
"errors"
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/jobs"
"github.com/sirupsen/logrus"
@@ -18,10 +21,14 @@ type Metrics interface {
type metrics struct {
log logrus.FieldLogger

client eth2client.Service

generalMetrics jobs.General
syncMetrics jobs.Sync
specMetrics jobs.Spec
forkMetrics jobs.Forks
beaconMetrics jobs.Beacon
eventMetrics jobs.Event
}

// NewMetrics returns a new metrics object.
@@ -32,17 +39,17 @@ func NewMetrics(client eth2client.Service, log logrus.FieldLogger, nodeName, nam

m := &metrics{
log: log,
client: client,
generalMetrics: jobs.NewGeneralJob(client, log, namespace, constLabels),
specMetrics: jobs.NewSpecJob(client, log, namespace, constLabels),
syncMetrics: jobs.NewSyncJob(client, log, namespace, constLabels),
forkMetrics: jobs.NewForksJob(client, log, namespace, constLabels),
beaconMetrics: jobs.NewBeaconJob(client, log, namespace, constLabels),
eventMetrics: jobs.NewEventJob(client, log, namespace, constLabels),
}

prometheus.MustRegister(m.generalMetrics.Slots)
prometheus.MustRegister(m.generalMetrics.NodeVersion)
prometheus.MustRegister(m.generalMetrics.ReOrgs)
prometheus.MustRegister(m.generalMetrics.ReOrgDepth)
prometheus.MustRegister(m.generalMetrics.FinalityCheckpoints)

prometheus.MustRegister(m.syncMetrics.Percentage)
prometheus.MustRegister(m.syncMetrics.EstimatedHighestSlot)
@@ -79,6 +86,18 @@ func NewMetrics(client eth2client.Service, log logrus.FieldLogger, nodeName, nam
prometheus.MustRegister(m.forkMetrics.Current)
prometheus.MustRegister(m.forkMetrics.Activated)

prometheus.MustRegister(m.beaconMetrics.Attestations)
prometheus.MustRegister(m.beaconMetrics.Deposits)
prometheus.MustRegister(m.beaconMetrics.Slashings)
prometheus.MustRegister(m.beaconMetrics.Transactions)
prometheus.MustRegister(m.beaconMetrics.VoluntaryExits)
prometheus.MustRegister(m.beaconMetrics.Slot)
prometheus.MustRegister(m.beaconMetrics.FinalityCheckpoints)
prometheus.MustRegister(m.beaconMetrics.ReOrgs)
prometheus.MustRegister(m.beaconMetrics.ReOrgDepth)

prometheus.MustRegister(m.eventMetrics.Count)

return m
}

@@ -87,4 +106,57 @@ func (m *metrics) StartAsync(ctx context.Context) {
go m.specMetrics.Start(ctx)
go m.syncMetrics.Start(ctx)
go m.forkMetrics.Start(ctx)
go m.beaconMetrics.Start(ctx)
go m.eventMetrics.Start(ctx)
go m.subscriptionLoop(ctx)
}

func (m *metrics) subscriptionLoop(ctx context.Context) {
subscribed := false

for {
if !subscribed {
if err := m.startSubscriptions(ctx); err != nil {
m.log.Errorf("Failed to subscribe to eth2 node: %v", err)
} else {
subscribed = true
}
}

time.Sleep(5 * time.Second)
}
}

func (m *metrics) startSubscriptions(ctx context.Context) error {
m.log.Info("starting subscriptions")

provider, isProvider := m.client.(eth2client.EventsProvider)
if !isProvider {
return errors.New("client does not implement eth2client.Subscriptions")
}

topics := []string{}

for key, supported := range v1.SupportedEventTopics {
if supported {
topics = append(topics, key)
}
}

if err := provider.Events(ctx, topics, func(event *v1.Event) {
m.handleEvent(ctx, event)
}); err != nil {
return err
}

return nil
}

func (m *metrics) handleEvent(ctx context.Context, event *v1.Event) {
m.generalMetrics.HandleEvent(ctx, event)
m.specMetrics.HandleEvent(ctx, event)
m.syncMetrics.HandleEvent(ctx, event)
m.forkMetrics.HandleEvent(ctx, event)
m.beaconMetrics.HandleEvent(ctx, event)
m.eventMetrics.HandleEvent(ctx, event)
}

0 comments on commit 54fc17d

Please sign in to comment.