diff --git a/pkg/exporter/consensus/jobs/beacon.go b/pkg/exporter/consensus/jobs/beacon.go index dd491b1..667c722 100644 --- a/pkg/exporter/consensus/jobs/beacon.go +++ b/pkg/exporter/consensus/jobs/beacon.go @@ -233,6 +233,10 @@ func (b *Beacon) setupSubscriptions(ctx context.Context) error { return err } + if _, err := b.beaconNode.OnFinalizedCheckpoint(ctx, b.handleFinalizedCheckpointEvent); err != nil { + return err + } + return nil } @@ -284,12 +288,6 @@ func (b *Beacon) getInitialData(ctx context.Context) { } } -func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) { - if event.Topic == EventTopicFinalizedCheckpoint { - b.handleFinalizedCheckpointEvent(ctx, event) - } -} - func (b *Beacon) handleChainReorg(ctx context.Context, event *v1.ChainReorgEvent) error { b.ReOrgs.Inc() b.ReOrgDepth.Add(float64(event.Depth)) @@ -297,13 +295,10 @@ func (b *Beacon) handleChainReorg(ctx context.Context, event *v1.ChainReorgEvent return nil } -func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.Event) { - _, ok := event.Data.(*v1.FinalizedCheckpointEvent) - if !ok { - return - } - +func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.FinalizedCheckpointEvent) error { b.updateFinalizedCheckpoint(ctx) + + return nil } func (b *Beacon) updateFinalizedCheckpoint(ctx context.Context) { diff --git a/pkg/exporter/consensus/metrics.go b/pkg/exporter/consensus/metrics.go index 9220502..dc6a243 100644 --- a/pkg/exporter/consensus/metrics.go +++ b/pkg/exporter/consensus/metrics.go @@ -2,11 +2,8 @@ package consensus import ( "context" - "errors" - "time" eth2client "github.com/attestantio/go-eth2-client" - v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon" @@ -144,63 +141,4 @@ func (m *metrics) StartAsync(ctx context.Context) { m.log.Errorf("Failed to start event metrics: %v", err) } }() - - go m.subscriptionLoop(ctx) -} - -func (m *metrics) subscriptionLoop(ctx context.Context) { - subscribed := false - - for { - if subscribed && (time.Since(m.eventMetrics.LastEventTime) > (5 * time.Minute)) { - m.log. - WithField("last_event_time", m.eventMetrics.LastEventTime.Local().String()). - Info("Haven't received any events for 5 minutes, re-subscribing") - - subscribed = false - } - - if !subscribed && m.client != nil { - if err := m.startSubscriptions(ctx); err != nil { - m.log.Errorf("Failed to subscribe to eth2 node: %v", err) - } else { - subscribed = true - } - } - - time.Sleep(60 * time.Second) - } -} - -func (m *metrics) startSubscriptions(ctx context.Context) error { - m.log.Info("starting subscriptions") - - provider, isProvider := m.client.(eth2client.EventsProvider) - if !isProvider { - return errors.New("client does not implement eth2client.Subscriptions") - } - - topics := []string{} - - for key, supported := range v1.SupportedEventTopics { - if key == "contribution_and_proof" { - continue - } - - if supported { - topics = append(topics, key) - } - } - - if err := provider.Events(ctx, topics, func(event *v1.Event) { - m.handleEvent(ctx, event) - }); err != nil { - return err - } - - return nil -} - -func (m *metrics) handleEvent(ctx context.Context, event *v1.Event) { - m.beaconMetrics.HandleEvent(ctx, event) }