diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index ee324733d20d..4d03f6a9c280 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -21,11 +21,11 @@ import ( // streams when the beacon chain is node does not respond. var reconnectPeriod = 5 * time.Second -// receiveBlocks starts a gRPC client stream listener to obtain +// ReceiveBlocks starts a gRPC client stream listener to obtain // blocks from the beacon node. Upon receiving a block, the service // broadcasts it to a feed for other services in slasher to subscribe to. -func (bs *Service) receiveBlocks(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "beaconclient.receiveBlocks") +func (bs *Service) ReceiveBlocks(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks") defer span.End() stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) if err != nil { @@ -81,11 +81,11 @@ func (bs *Service) receiveBlocks(ctx context.Context) { } } -// receiveAttestations starts a gRPC client stream listener to obtain +// ReceiveAttestations starts a gRPC client stream listener to obtain // attestations from the beacon node. Upon receiving an attestation, the service // broadcasts it to a feed for other services in slasher to subscribe to. -func (bs *Service) receiveAttestations(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "beaconclient.receiveAttestations") +func (bs *Service) ReceiveAttestations(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveAttestations") defer span.End() stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) if err != nil { diff --git a/slasher/beaconclient/receivers_test.go b/slasher/beaconclient/receivers_test.go index 25f61e4ccf05..11fc54bb49d8 100644 --- a/slasher/beaconclient/receivers_test.go +++ b/slasher/beaconclient/receivers_test.go @@ -36,7 +36,7 @@ func TestService_ReceiveBlocks(t *testing.T) { ).Do(func() { cancel() }) - bs.receiveBlocks(ctx) + bs.ReceiveBlocks(ctx) } func TestService_ReceiveAttestations(t *testing.T) { @@ -68,7 +68,7 @@ func TestService_ReceiveAttestations(t *testing.T) { ).Do(func() { cancel() }) - bs.receiveAttestations(ctx) + bs.ReceiveAttestations(ctx) } func TestService_ReceiveAttestations_Batched(t *testing.T) { @@ -110,7 +110,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) { cancel() }) - go bs.receiveAttestations(ctx) + go bs.ReceiveAttestations(ctx) bs.receivedAttestationsBuffer <- att att.Data.Target.Root = []byte("test root 2") bs.receivedAttestationsBuffer <- att diff --git a/slasher/beaconclient/service.go b/slasher/beaconclient/service.go index 11960efcfb1b..c4eb55cf0df6 100644 --- a/slasher/beaconclient/service.go +++ b/slasher/beaconclient/service.go @@ -191,7 +191,4 @@ func (bs *Service) Start() { go bs.subscribeDetectedProposerSlashings(bs.ctx, bs.proposerSlashingsChan) go bs.subscribeDetectedAttesterSlashings(bs.ctx, bs.attesterSlashingsChan) - // We listen to a stream of blocks and attestations from the beacon node. - go bs.receiveBlocks(bs.ctx) - go bs.receiveAttestations(bs.ctx) } diff --git a/slasher/cache/flat_span_cache.go b/slasher/cache/flat_span_cache.go index 0e2fe44bfb3a..93d49f78cf33 100644 --- a/slasher/cache/flat_span_cache.go +++ b/slasher/cache/flat_span_cache.go @@ -65,3 +65,8 @@ func (c *EpochFlatSpansCache) Purge() { log.Info("Saving all cached data to DB, please wait for completion.") c.cache.Purge() } + +// Length returns the number of cached items. +func (c *EpochFlatSpansCache) Length() int { + return c.cache.Len() +} diff --git a/slasher/db/kv/spanner_new.go b/slasher/db/kv/spanner_new.go index 8bbcf4fd3b4f..ed778729bb0a 100644 --- a/slasher/db/kv/spanner_new.go +++ b/slasher/db/kv/spanner_new.go @@ -100,3 +100,12 @@ func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.Epo return b.Put(bytesutil.Bytes8(epoch), es.Bytes()) }) } + +// CacheLength returns the number of cached items. +func (db *Store) CacheLength(ctx context.Context) int { + ctx, span := trace.StartSpan(ctx, "slasherDB.CacheLength") + defer span.End() + len := db.flatSpanCache.Length() + log.Debugf("Span cache length %d", len) + return len +} diff --git a/slasher/detection/attestations/BUILD.bazel b/slasher/detection/attestations/BUILD.bazel index 49dbac50dd1b..b7ff92c98299 100644 --- a/slasher/detection/attestations/BUILD.bazel +++ b/slasher/detection/attestations/BUILD.bazel @@ -29,6 +29,7 @@ go_test( srcs = ["spanner_test.go"], embed = [":go_default_library"], deps = [ + "//shared/featureconfig:go_default_library", "//shared/sliceutil:go_default_library", "//slasher/db/testing:go_default_library", "//slasher/db/types:go_default_library", diff --git a/slasher/detection/attestations/spanner.go b/slasher/detection/attestations/spanner.go index 75c1891d6d40..002cab9cd385 100644 --- a/slasher/detection/attestations/spanner.go +++ b/slasher/detection/attestations/spanner.go @@ -233,8 +233,12 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte var spanMap *types.EpochStore epoch := source - 1 lookbackEpoch := epoch - epochLookback + // prevent underflow + if epoch < epochLookback { + lookbackEpoch = 0 + } untilEpoch := lookbackEpoch - if int(untilEpoch) < 0 || featureconfig.Get().DisableLookback { + if featureconfig.Get().DisableLookback { untilEpoch = 0 } var err error @@ -268,7 +272,7 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte indices = append(indices, idx) } } - if epoch < lookbackEpoch && dbOrCache == dbTypes.UseCache { + if epoch <= lookbackEpoch && dbOrCache == dbTypes.UseCache { dbOrCache = dbTypes.UseDB } if err := s.slasherDB.SaveEpochSpans(ctx, epoch, spanMap, dbOrCache); err != nil { diff --git a/slasher/detection/attestations/spanner_test.go b/slasher/detection/attestations/spanner_test.go index 66c9fea3a8ff..aa80a3f35e78 100644 --- a/slasher/detection/attestations/spanner_test.go +++ b/slasher/detection/attestations/spanner_test.go @@ -6,6 +6,7 @@ import ( "testing" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/sliceutil" testDB "github.com/prysmaticlabs/prysm/slasher/db/testing" dbTypes "github.com/prysmaticlabs/prysm/slasher/db/types" @@ -845,3 +846,47 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) { }) } } + +func TestSpanDetector_UpdateMinSpansCheckCacheSize(t *testing.T) { + resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{DisableLookback: true}) + defer resetCfg() + + att := ðpb.IndexedAttestation{ + AttestingIndices: []uint64{0}, + Data: ðpb.AttestationData{ + CommitteeIndex: 0, + Source: ðpb.Checkpoint{ + Epoch: 150, + }, + Target: ðpb.Checkpoint{ + Epoch: 152, + }, + }, + Signature: []byte{1, 2}, + } + + db := testDB.SetupSlasherDB(t, false) + ctx := context.Background() + defer func() { + if err := db.ClearDB(); err != nil { + t.Log(err) + } + }() + defer func() { + if err := db.Close(); err != nil { + t.Log(err) + } + }() + + sd := &SpanDetector{ + slasherDB: db, + } + if err := sd.updateMinSpan(ctx, att); err != nil { + t.Fatal(err) + } + + if len := db.CacheLength(ctx); len != epochLookback { + t.Fatalf("Expected cache length to be equal to epochLookback: %d got: %d", epochLookback, len) + } + +} diff --git a/slasher/detection/service.go b/slasher/detection/service.go index e23f0226a55d..bdd3f2c2c680 100644 --- a/slasher/detection/service.go +++ b/slasher/detection/service.go @@ -88,9 +88,11 @@ func (ds *Service) Start() { if featureconfig.Get().EnableHistoricalDetection { // The detection service runs detection on all historical // chain data since genesis. - go ds.detectHistoricalChainData(ds.ctx) + ds.detectHistoricalChainData(ds.ctx) } - + // We listen to a stream of blocks and attestations from the beacon node. + go ds.beaconClient.ReceiveBlocks(ds.ctx) + go ds.beaconClient.ReceiveAttestations(ds.ctx) // We subscribe to incoming blocks from the beacon node via // our gRPC client to keep detecting slashable offenses. go ds.detectIncomingBlocks(ds.ctx, ds.blocksChan)