From 2bf1332e5c88d0b07cd458f3967ee5df74a8327a Mon Sep 17 00:00:00 2001 From: Shay Zluf Date: Thu, 20 Aug 2020 10:31:16 +0300 Subject: [PATCH] Slasher improvements (#7061) * Slasher loging and span removal * merge and reconnect fix * ivan feedback --- slasher/beaconclient/receivers.go | 75 ++++++++++++------------------- slasher/db/kv/spanner_new.go | 6 --- slasher/flags/flags.go | 2 +- slasher/rpc/server.go | 9 ++++ 4 files changed, 38 insertions(+), 54 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 6e1cc810d415..26b12ffcab80 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -13,7 +13,6 @@ import ( "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" @@ -49,11 +48,17 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) { if e, ok := status.FromError(err); ok { switch e.Code() { case codes.Canceled, codes.Internal: - stream, err = bs.restartBlockStream(ctx) + err = bs.restartBeaconConnection(ctx) if err != nil { - log.WithError(err).Error("Could not restart stream") + log.WithError(err).Error("Could not restart beacon connection") return } + stream, err = bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) + if err != nil { + log.WithError(err).Error("Could not restart block stream") + return + } + log.Info("Block stream restarted...") default: log.WithError(err).Errorf("Could not receive block from beacon node. rpc status: %v", e.Code()) return @@ -111,11 +116,17 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) { if e, ok := status.FromError(err); ok { switch e.Code() { case codes.Canceled, codes.Internal: - stream, err = bs.restartIndexedAttestationStream(ctx) + err = bs.restartBeaconConnection(ctx) if err != nil { - log.WithError(err).Error("Could not restart stream") + log.WithError(err).Error("Could not restart beacon connection") return } + stream, err = bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) + if err != nil { + log.WithError(err).Error("Could not restart attestation stream") + return + } + log.Info("Attestation stream restarted...") default: log.WithError(err).Errorf("Could not receive attestations from beacon node. rpc status: %v", e.Code()) return @@ -173,61 +184,31 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) { } } -func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) { +func (bs *Service) restartBeaconConnection(ctx context.Context) error { ticker := time.NewTicker(reconnectPeriod) for { select { case <-ticker.C: - log.Info("Context closed, attempting to restart attestation stream") - conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...) - if err != nil { - log.Debug("Failed to dial beacon node") + if bs.conn.GetState() == connectivity.TransientFailure || bs.conn.GetState() == connectivity.Idle { + log.Debugf("Connection status %v", bs.conn.GetState()) + log.Info("Beacon node is still down") continue } - log.Debugf("connection status %v", conn.GetState()) - if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle { - log.Debug("Beacon node is still down") - continue - } - stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) + status, err := bs.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{}) if err != nil { + log.WithError(err).Error("Could not fetch sync status") continue } - log.Info("Attestation stream restarted...") - return stream, nil - case <-ctx.Done(): - log.Debug("Context closed, exiting reconnect routine") - return nil, errors.New("context closed, no longer attempting to restart stream") - } - } - -} - -func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) { - ticker := time.NewTicker(reconnectPeriod) - for { - select { - case <-ticker.C: - log.Info("Context closed, attempting to restart block stream") - conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...) - if err != nil { - log.Debug("Failed to dial beacon node") + if status == nil || status.Syncing { + log.Info("Waiting for beacon node to be fully synced...") continue } - log.Debugf("connection status %v", conn.GetState()) - if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle { - log.Debug("Beacon node is still down") - continue - } - stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) - if err != nil { - continue - } - log.Info("Block stream restarted...") - return stream, nil + log.Info("Beacon node is fully synced") + + return nil case <-ctx.Done(): log.Debug("Context closed, exiting reconnect routine") - return nil, errors.New("context closed, no longer attempting to restart stream") + return errors.New("context closed, no longer attempting to restart stream") } } diff --git a/slasher/db/kv/spanner_new.go b/slasher/db/kv/spanner_new.go index ed778729bb0a..f58a79d5b8aa 100644 --- a/slasher/db/kv/spanner_new.go +++ b/slasher/db/kv/spanner_new.go @@ -46,9 +46,6 @@ func persistFlatSpanMapsOnEviction(db *Store) func(key interface{}, value interf // Returns span byte array, and error in case of db error. // returns empty byte array if no entry for this epoch exists in db. func (db *Store) EpochSpans(ctx context.Context, epoch uint64, fromCache bool) (*types.EpochStore, error) { - ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpans") - defer span.End() - // Get from the cache if it exists or is requested, if not, go to DB. if fromCache && db.flatSpanCache.Has(epoch) || db.flatSpanCache.Has(epoch) { spans, _ := db.flatSpanCache.Get(epoch) @@ -77,9 +74,6 @@ func (db *Store) EpochSpans(ctx context.Context, epoch uint64, fromCache bool) ( // SaveEpochSpans accepts a epoch and span byte array and writes it to disk. func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.EpochStore, toCache bool) error { - ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpans") - defer span.End() - if len(es.Bytes())%int(types.SpannerEncodedLength) != 0 { return types.ErrWrongSize } diff --git a/slasher/flags/flags.go b/slasher/flags/flags.go index b188ce04d1bc..e942416b881e 100644 --- a/slasher/flags/flags.go +++ b/slasher/flags/flags.go @@ -55,6 +55,6 @@ var ( SpanCacheSize = &cli.IntFlag{ Name: "spans-cache-size", Usage: "Sets the span cache size.", - Value: 256, + Value: 1500, } ) diff --git a/slasher/rpc/server.go b/slasher/rpc/server.go index b18e4a6aaa22..4f3f8b1c3f50 100644 --- a/slasher/rpc/server.go +++ b/slasher/rpc/server.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/slasher/beaconclient" "github.com/prysmaticlabs/prysm/slasher/db" "github.com/prysmaticlabs/prysm/slasher/detection" + "github.com/sirupsen/logrus" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,6 +38,10 @@ func (ss *Server) IsSlashableAttestation(ctx context.Context, req *ethpb.Indexed ctx, span := trace.StartSpan(ctx, "detection.IsSlashableAttestation") defer span.End() + log.WithFields(logrus.Fields{ + "slot": req.Data.Slot, + "indices": req.AttestingIndices, + }).Debug("Received attestation via RPC") if req == nil { return nil, status.Error(codes.InvalidArgument, "nil request provided") } @@ -116,6 +121,10 @@ func (ss *Server) IsSlashableBlock(ctx context.Context, req *ethpb.SignedBeaconB ctx, span := trace.StartSpan(ctx, "detection.IsSlashableBlock") defer span.End() + log.WithFields(logrus.Fields{ + "slot": req.Header.Slot, + "proposer_index": req.Header.ProposerIndex, + }).Info("Received block via RPC") if req == nil { return nil, status.Error(codes.InvalidArgument, "nil request provided") }