Skip to content

Commit

Permalink
Batch Block Roots Requesting (prysmaticlabs#7027)
Browse files Browse the repository at this point in the history
* checkpoint
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
* test
* gaz
* fix test
* comment
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
* terence's review
* Merge branch 'multipleBranchProcessing' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
  • Loading branch information
nisdas authored Aug 20, 2020
1 parent ba5da21 commit 7744c3a
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 54 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const maxBadResponses = 5
const cacheNumCounters, cacheMaxCost, cacheBufferItems = 1000, 1000, 64

// maxDialTimeout is the timeout for a single peer dial.
const maxDialTimeout = 30 * time.Second
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout

// Service for managing peer to peer (p2p) networking.
type Service struct {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_test(
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",
Expand Down
36 changes: 3 additions & 33 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package sync
import (
"context"
"encoding/hex"
"io"
"sync"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bls"
Expand All @@ -17,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand Down Expand Up @@ -47,8 +44,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()

pids := s.p2p.Peers().Connected()

// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
Expand All @@ -61,6 +56,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}
s.pendingAttsLock.RUnlock()

pendingRoots := [][32]byte{}
randGen := rand.NewGenerator()
for _, bRoot := range roots {
s.pendingAttsLock.RLock()
Expand Down Expand Up @@ -123,36 +119,10 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
"attCount": len(attestations),
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
}).Debug("Requesting block for pending attestation")

// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer or equal to the pending attestation's target boundary slot.
// If there are no peer id's available, then we should exit from this function. The function will
// be run again periodically, and there may be peers available in future runs.
if len(pids) == 0 {
log.Debug("No peer IDs available to request missing block from for pending attestation")
return nil
}
pid := pids[randGen.Int()%len(pids)]
targetSlot := helpers.SlotToEpoch(attestations[0].Message.Aggregate.Data.Target.Epoch)
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "could not get chain state for peer")
}
if cs != nil && cs.HeadSlot >= targetSlot {
pid = p
break
}
}

req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil && err == io.EOF {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
pendingRoots = append(pendingRoots, bRoot)
}
}
return nil
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}

// This defines how pending attestations is saved in the map. The key is the
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/pending_attestations_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{Genesis: roughtime.Now()},
chain: &mock.ChainService{Genesis: roughtime.Now(), FinalizedCheckPoint: &ethpb.Checkpoint{}},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}
Expand Down
69 changes: 50 additions & 19 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
Expand All @@ -22,6 +23,9 @@ import (

var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)

const maxPeerRequest = 50
const numOfTries = 5

// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
ctx := context.Background()
Expand All @@ -46,6 +50,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
return errors.Wrap(err, "could not validate pending slots")
}
slots := s.sortedPendingSlots()
parentRoots := [][32]byte{}

span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(slots))),
Expand Down Expand Up @@ -110,26 +115,8 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}

// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
parentRoots = append(parentRoots, bytesutil.ToBytes32(b.Block.ParentRoot))

if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
span.End()
continue
}
Expand Down Expand Up @@ -162,6 +149,50 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
}

return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}

func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error {
ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest")
defer span.End()

if len(roots) == 0 {
return nil
}

_, bestPeers := s.p2p.Peers().BestFinalized(maxPeerRequest, s.chain.FinalizedCheckpt().Epoch)
if len(bestPeers) == 0 {
return nil
}
roots = s.dedupRoots(roots)
// Randomly choose a peer to query from our best peers. If that peer cannot return
// all the requested blocks, we randomly select another peer.
pid := bestPeers[randGen.Int()%len(bestPeers)]
for i := 0; i < numOfTries; i++ {
req := roots
if len(roots) > int(params.BeaconNetworkConfig().MaxRequestBlocks) {
req = roots[:params.BeaconNetworkConfig().MaxRequestBlocks]
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
newRoots := make([][32]byte, 0, len(roots))
s.pendingQueueLock.RLock()
for _, rt := range roots {
if !s.seenPendingBlocks[rt] {
newRoots = append(newRoots, rt)
}
}
s.pendingQueueLock.RUnlock()
if len(newRoots) == 0 {
break
}
// Choosing a new peer with the leftover set of
// roots to request.
roots = newRoots
pid = bestPeers[randGen.Int()%len(bestPeers)]
}
return nil
}

Expand Down
82 changes: 82 additions & 0 deletions beacon-chain/sync/pending_blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -17,6 +18,8 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
Expand Down Expand Up @@ -278,3 +281,82 @@ func TestService_sortedPendingSlots(t *testing.T) {
want := []uint64{lastSlot - 5, lastSlot - 3, lastSlot - 2, lastSlot}
assert.DeepEqual(t, want, r.sortedPendingSlots(), "Unexpected pending slots list")
}

func TestService_BatchRootRequest(t *testing.T) {
db, _ := dbtest.SetupDB(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 1,
},
},
slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}

err := r.initCaches()
require.NoError(t, err)
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetChainState(p2.PeerID(), &pb.Status{FinalizedEpoch: 2})

b0 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
require.NoError(t, r.db.SaveBlock(context.Background(), b0))
b0Root, err := stateutil.BlockRoot(b0.Block)
require.NoError(t, err)
b1 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}}
require.NoError(t, r.db.SaveBlock(context.Background(), b1))
b1Root, err := stateutil.BlockRoot(b1.Block)
require.NoError(t, err)

b2 := &ethpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]}
b2Root, err := ssz.HashTreeRoot(b2)
require.NoError(t, err)
b5 := &ethpb.BeaconBlock{Slot: 5, ParentRoot: b2Root[:]}
b5Root, err := ssz.HashTreeRoot(b5)
require.NoError(t, err)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]}
b3Root, err := ssz.HashTreeRoot(b3)
require.NoError(t, err)
b4 := &ethpb.BeaconBlock{Slot: 4, ParentRoot: b3Root[:]}
b4Root, err := ssz.HashTreeRoot(b4)
require.NoError(t, err)

// Send in duplicated roots to also test deduplicaton.
sentRoots := [][32]byte{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root}
expectedRoots := [][32]byte{b2Root, b3Root, b4Root, b5Root}

pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := [][32]byte{}
assert.NoError(t, p2.Encoding().DecodeWithMaxLength(stream, &out))
assert.DeepEqual(t, expectedRoots, out, "Did not receive expected message")
response := []*ethpb.SignedBeaconBlock{{Block: b2},
{Block: b3}, {Block: b4}, {Block: b5}}
for _, blk := range response {
_, err := stream.Write([]byte{responseCodeSuccess})
assert.NoError(t, err, "Failed to write to stream")
_, err = p2.Encoding().EncodeWithMaxLength(stream, blk)
assert.NoError(t, err, "Could not send response back")
}
assert.NoError(t, stream.Close())
})

require.NoError(t, r.sendBatchRootRequest(context.Background(), sentRoots, rand.NewGenerator()))

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
assert.Equal(t, 4, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
}
13 changes: 13 additions & 0 deletions beacon-chain/sync/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func (s *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][
return newBlks, newRoots, nil
}

func (s *Service) dedupRoots(roots [][32]byte) [][32]byte {
newRoots := make([][32]byte, 0, len(roots))
rootMap := make(map[[32]byte]bool, len(roots))
for i, r := range roots {
if rootMap[r] {
continue
}
rootMap[r] = true
newRoots = append(newRoots, roots[i])
}
return newRoots
}

// sort the provided blocks and roots in ascending order. This method assumes that the size of
// block slice and root slice is equal.
func (s *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {
Expand Down

0 comments on commit 7744c3a

Please sign in to comment.