Skip to content

Commit

Permalink
Slasher improvements (prysmaticlabs#7061)
Browse files Browse the repository at this point in the history
* Slasher loging and span removal

* merge and reconnect fix

* ivan feedback
  • Loading branch information
shayzluf authored Aug 20, 2020
1 parent 7744c3a commit 2bf1332
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 54 deletions.
75 changes: 28 additions & 47 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -49,11 +48,17 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled, codes.Internal:
stream, err = bs.restartBlockStream(ctx)
err = bs.restartBeaconConnection(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not restart block stream")
return
}
log.Info("Block stream restarted...")
default:
log.WithError(err).Errorf("Could not receive block from beacon node. rpc status: %v", e.Code())
return
Expand Down Expand Up @@ -111,11 +116,17 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled, codes.Internal:
stream, err = bs.restartIndexedAttestationStream(ctx)
err = bs.restartBeaconConnection(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not restart attestation stream")
return
}
log.Info("Attestation stream restarted...")
default:
log.WithError(err).Errorf("Could not receive attestations from beacon node. rpc status: %v", e.Code())
return
Expand Down Expand Up @@ -173,61 +184,31 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
}
}

func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) {
func (bs *Service) restartBeaconConnection(ctx context.Context) error {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart attestation stream")
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
if err != nil {
log.Debug("Failed to dial beacon node")
if bs.conn.GetState() == connectivity.TransientFailure || bs.conn.GetState() == connectivity.Idle {
log.Debugf("Connection status %v", bs.conn.GetState())
log.Info("Beacon node is still down")
continue
}
log.Debugf("connection status %v", conn.GetState())
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
log.Debug("Beacon node is still down")
continue
}
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
status, err := bs.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
continue
}
log.Info("Attestation stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
}
}

}

func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart block stream")
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
if err != nil {
log.Debug("Failed to dial beacon node")
if status == nil || status.Syncing {
log.Info("Waiting for beacon node to be fully synced...")
continue
}
log.Debugf("connection status %v", conn.GetState())
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
log.Debug("Beacon node is still down")
continue
}
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Block stream restarted...")
return stream, nil
log.Info("Beacon node is fully synced")

return nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
return errors.New("context closed, no longer attempting to restart stream")
}
}

Expand Down
6 changes: 0 additions & 6 deletions slasher/db/kv/spanner_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ func persistFlatSpanMapsOnEviction(db *Store) func(key interface{}, value interf
// Returns span byte array, and error in case of db error.
// returns empty byte array if no entry for this epoch exists in db.
func (db *Store) EpochSpans(ctx context.Context, epoch uint64, fromCache bool) (*types.EpochStore, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpans")
defer span.End()

// Get from the cache if it exists or is requested, if not, go to DB.
if fromCache && db.flatSpanCache.Has(epoch) || db.flatSpanCache.Has(epoch) {
spans, _ := db.flatSpanCache.Get(epoch)
Expand Down Expand Up @@ -77,9 +74,6 @@ func (db *Store) EpochSpans(ctx context.Context, epoch uint64, fromCache bool) (

// SaveEpochSpans accepts a epoch and span byte array and writes it to disk.
func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.EpochStore, toCache bool) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpans")
defer span.End()

if len(es.Bytes())%int(types.SpannerEncodedLength) != 0 {
return types.ErrWrongSize
}
Expand Down
2 changes: 1 addition & 1 deletion slasher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ var (
SpanCacheSize = &cli.IntFlag{
Name: "spans-cache-size",
Usage: "Sets the span cache size.",
Value: 256,
Value: 1500,
}
)
9 changes: 9 additions & 0 deletions slasher/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/slasher/beaconclient"
"github.com/prysmaticlabs/prysm/slasher/db"
"github.com/prysmaticlabs/prysm/slasher/detection"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +38,10 @@ func (ss *Server) IsSlashableAttestation(ctx context.Context, req *ethpb.Indexed
ctx, span := trace.StartSpan(ctx, "detection.IsSlashableAttestation")
defer span.End()

log.WithFields(logrus.Fields{
"slot": req.Data.Slot,
"indices": req.AttestingIndices,
}).Debug("Received attestation via RPC")
if req == nil {
return nil, status.Error(codes.InvalidArgument, "nil request provided")
}
Expand Down Expand Up @@ -116,6 +121,10 @@ func (ss *Server) IsSlashableBlock(ctx context.Context, req *ethpb.SignedBeaconB
ctx, span := trace.StartSpan(ctx, "detection.IsSlashableBlock")
defer span.End()

log.WithFields(logrus.Fields{
"slot": req.Header.Slot,
"proposer_index": req.Header.ProposerIndex,
}).Info("Received block via RPC")
if req == nil {
return nil, status.Error(codes.InvalidArgument, "nil request provided")
}
Expand Down

0 comments on commit 2bf1332

Please sign in to comment.