Skip to content

Commit

Permalink
Historical detection fix (prysmaticlabs#6455)
Browse files Browse the repository at this point in the history
* Historical detection before realtime detection

* comment fixes

* remove logs

* remove logs

* remove logs

* gaz

* handle underflow

* add regressiion test

* update test name

* gaz

* gofmt

* fix comment

Co-authored-by: Ivan Martinez <[email protected]>
Co-authored-by: Raul Jordan <[email protected]>
  • Loading branch information
3 people authored Jul 7, 2020
1 parent 7f741e4 commit 8ddfde4
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 16 deletions.
12 changes: 6 additions & 6 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
// streams when the beacon chain is node does not respond.
var reconnectPeriod = 5 * time.Second

// receiveBlocks starts a gRPC client stream listener to obtain
// ReceiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
func (bs *Service) receiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.receiveBlocks")
func (bs *Service) ReceiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks")
defer span.End()
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
Expand Down Expand Up @@ -81,11 +81,11 @@ func (bs *Service) receiveBlocks(ctx context.Context) {
}
}

// receiveAttestations starts a gRPC client stream listener to obtain
// ReceiveAttestations starts a gRPC client stream listener to obtain
// attestations from the beacon node. Upon receiving an attestation, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
func (bs *Service) receiveAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.receiveAttestations")
func (bs *Service) ReceiveAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveAttestations")
defer span.End()
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions slasher/beaconclient/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestService_ReceiveBlocks(t *testing.T) {
).Do(func() {
cancel()
})
bs.receiveBlocks(ctx)
bs.ReceiveBlocks(ctx)
}

func TestService_ReceiveAttestations(t *testing.T) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestService_ReceiveAttestations(t *testing.T) {
).Do(func() {
cancel()
})
bs.receiveAttestations(ctx)
bs.ReceiveAttestations(ctx)
}

func TestService_ReceiveAttestations_Batched(t *testing.T) {
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) {
cancel()
})

go bs.receiveAttestations(ctx)
go bs.ReceiveAttestations(ctx)
bs.receivedAttestationsBuffer <- att
att.Data.Target.Root = []byte("test root 2")
bs.receivedAttestationsBuffer <- att
Expand Down
3 changes: 0 additions & 3 deletions slasher/beaconclient/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,4 @@ func (bs *Service) Start() {
go bs.subscribeDetectedProposerSlashings(bs.ctx, bs.proposerSlashingsChan)
go bs.subscribeDetectedAttesterSlashings(bs.ctx, bs.attesterSlashingsChan)

// We listen to a stream of blocks and attestations from the beacon node.
go bs.receiveBlocks(bs.ctx)
go bs.receiveAttestations(bs.ctx)
}
5 changes: 5 additions & 0 deletions slasher/cache/flat_span_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ func (c *EpochFlatSpansCache) Purge() {
log.Info("Saving all cached data to DB, please wait for completion.")
c.cache.Purge()
}

// Length returns the number of cached items.
func (c *EpochFlatSpansCache) Length() int {
return c.cache.Len()
}
9 changes: 9 additions & 0 deletions slasher/db/kv/spanner_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.Epo
return b.Put(bytesutil.Bytes8(epoch), es.Bytes())
})
}

// CacheLength returns the number of cached items.
func (db *Store) CacheLength(ctx context.Context) int {
ctx, span := trace.StartSpan(ctx, "slasherDB.CacheLength")
defer span.End()
len := db.flatSpanCache.Length()
log.Debugf("Span cache length %d", len)
return len
}
1 change: 1 addition & 0 deletions slasher/detection/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
srcs = ["spanner_test.go"],
embed = [":go_default_library"],
deps = [
"//shared/featureconfig:go_default_library",
"//shared/sliceutil:go_default_library",
"//slasher/db/testing:go_default_library",
"//slasher/db/types:go_default_library",
Expand Down
8 changes: 6 additions & 2 deletions slasher/detection/attestations/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,12 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte
var spanMap *types.EpochStore
epoch := source - 1
lookbackEpoch := epoch - epochLookback
// prevent underflow
if epoch < epochLookback {
lookbackEpoch = 0
}
untilEpoch := lookbackEpoch
if int(untilEpoch) < 0 || featureconfig.Get().DisableLookback {
if featureconfig.Get().DisableLookback {
untilEpoch = 0
}
var err error
Expand Down Expand Up @@ -268,7 +272,7 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte
indices = append(indices, idx)
}
}
if epoch < lookbackEpoch && dbOrCache == dbTypes.UseCache {
if epoch <= lookbackEpoch && dbOrCache == dbTypes.UseCache {
dbOrCache = dbTypes.UseDB
}
if err := s.slasherDB.SaveEpochSpans(ctx, epoch, spanMap, dbOrCache); err != nil {
Expand Down
45 changes: 45 additions & 0 deletions slasher/detection/attestations/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
testDB "github.com/prysmaticlabs/prysm/slasher/db/testing"
dbTypes "github.com/prysmaticlabs/prysm/slasher/db/types"
Expand Down Expand Up @@ -845,3 +846,47 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) {
})
}
}

func TestSpanDetector_UpdateMinSpansCheckCacheSize(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{DisableLookback: true})
defer resetCfg()

att := &ethpb.IndexedAttestation{
AttestingIndices: []uint64{0},
Data: &ethpb.AttestationData{
CommitteeIndex: 0,
Source: &ethpb.Checkpoint{
Epoch: 150,
},
Target: &ethpb.Checkpoint{
Epoch: 152,
},
},
Signature: []byte{1, 2},
}

db := testDB.SetupSlasherDB(t, false)
ctx := context.Background()
defer func() {
if err := db.ClearDB(); err != nil {
t.Log(err)
}
}()
defer func() {
if err := db.Close(); err != nil {
t.Log(err)
}
}()

sd := &SpanDetector{
slasherDB: db,
}
if err := sd.updateMinSpan(ctx, att); err != nil {
t.Fatal(err)
}

if len := db.CacheLength(ctx); len != epochLookback {
t.Fatalf("Expected cache length to be equal to epochLookback: %d got: %d", epochLookback, len)
}

}
6 changes: 4 additions & 2 deletions slasher/detection/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ func (ds *Service) Start() {
if featureconfig.Get().EnableHistoricalDetection {
// The detection service runs detection on all historical
// chain data since genesis.
go ds.detectHistoricalChainData(ds.ctx)
ds.detectHistoricalChainData(ds.ctx)
}

// We listen to a stream of blocks and attestations from the beacon node.
go ds.beaconClient.ReceiveBlocks(ds.ctx)
go ds.beaconClient.ReceiveAttestations(ds.ctx)
// We subscribe to incoming blocks from the beacon node via
// our gRPC client to keep detecting slashable offenses.
go ds.detectIncomingBlocks(ds.ctx, ds.blocksChan)
Expand Down

0 comments on commit 8ddfde4

Please sign in to comment.