Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Use RLNC for block propagation #14813

Draft
wants to merge 29 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7043dac
add committer
potuz Jan 16, 2025
a32ed2e
Add chunks and message verifying
potuz Jan 17, 2025
f51132f
Add echelon form and addRow
potuz Jan 18, 2025
799bd33
Add node, receive and prepare message functions
potuz Jan 18, 2025
f46eaa8
Add decoding and tests.
potuz Jan 19, 2025
8b905d2
Add feature flag
potuz Jan 19, 2025
d93ab44
Handle incoming chunks on the P2P layer
potuz Jan 20, 2025
303ac87
Add a cache for incoming chunks
potuz Jan 20, 2025
044456a
prune the chunk cache every 10 minutes
potuz Jan 20, 2025
8bb3c38
Handle the block when it can be recovered
potuz Jan 21, 2025
9a3bcb8
Add chunk broadcasting
potuz Jan 21, 2025
7a00a9c
Add Ristretto trusted setup
potuz Jan 21, 2025
9ca832b
remove signature check
potuz Jan 22, 2025
186974f
Add endpoint to propose blocks
potuz Jan 22, 2025
2771bfc
Add the signatures over the commitments to the block before processin…
potuz Jan 23, 2025
50e327c
Do not check blob signature
potuz Jan 23, 2025
e4075b6
Implement RLNC Broadcasting (#14830)
nisdas Jan 27, 2025
ca2a20e
Fix Validator Config
nisdas Jan 27, 2025
c2073df
fix block encoding when chunking
potuz Jan 27, 2025
51b86ab
Broadcast chunked blocks
potuz Jan 27, 2025
e593797
add log when decoding a block
potuz Jan 27, 2025
84051f4
broadcast chunks on gossip
potuz Jan 27, 2025
f8ddcb8
Fix Interface Assertion
nisdas Jan 28, 2025
cc5681b
Build Reproducible Test
nisdas Jan 28, 2025
b6f7603
Fix chunking
potuz Jan 28, 2025
8a6d74d
Use SignedBeaconBlock
nisdas Jan 28, 2025
1220e07
send block feed when received via chunks
potuz Jan 29, 2025
a90cdd8
create new context to process blocks
potuz Jan 29, 2025
82e74cf
do not verify block signatures over RPC
potuz Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync/rlnc:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/rlnc"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

Expand Down Expand Up @@ -69,6 +70,14 @@ func WithDepositCache(c cache.DepositCache) Option {
}
}

// WithChunkCommitter for chunk committer.
func WithChunkCommitter(c *rlnc.Committer) Option {
return func(s *Service) error {
s.cfg.ChunkCommitter = c
return nil
}
}

// WithPayloadIDCache for payload ID cache.
func WithPayloadIDCache(c *cache.PayloadIDCache) Option {
return func(s *Service) error {
Expand Down
18 changes: 0 additions & 18 deletions beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,24 +651,6 @@ func TestOnBlock_NilBlock(t *testing.T) {
require.Equal(t, true, IsInvalidBlock(err))
}

func TestOnBlock_InvalidSignature(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx

gs, keys := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))

blk, err := util.GenerateFullBlock(gs, keys, util.DefaultBlockGenConfig(), 1)
require.NoError(t, err)
blk.Signature = []byte{'a'} // Mutate the signature.
wsb, err := consensusblocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
_, err = service.validateStateTransition(ctx, preState, wsb)
require.Equal(t, true, IsInvalidBlock(err))
}

func TestOnBlock_CallNewPayloadAndForkchoiceUpdated(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already synced block")
return nil
}
if s.BlockBeingSynced(blockRoot) {
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already syncing block")
return nil
}
receivedTime := time.Now()
s.blockBeingSynced.set(blockRoot)
defer s.blockBeingSynced.unset(blockRoot)
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/rlnc"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -92,6 +93,7 @@ type config struct {
FinalizedStateAtStartUp state.BeaconState
ExecutionEngineCaller execution.EngineCaller
SyncChecker Checker
ChunkCommitter *rlnc.Committer
}

// Checker is an interface used to determine if a node is in initial sync
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
return nil
}

func (mb *mockBroadcaster) BroadcastBlockChunks(_ context.Context, _ []*ethpb.BeaconBlockChunk) error {
mb.broadcastCalled = true
return nil
}

func (mb *mockBroadcaster) BroadcastAttestation(_ context.Context, _ uint64, _ ethpb.Att) error {
mb.broadcastCalled = true
return nil
Expand Down
49 changes: 0 additions & 49 deletions beacon-chain/core/blocks/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,52 +291,3 @@ func TestProcessBlockHeader_OK(t *testing.T) {
}
assert.Equal(t, true, proto.Equal(nsh, expected), "Expected %v, received %v", expected, nsh)
}

func TestBlockSignatureSet_OK(t *testing.T) {
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount)
for i := 0; i < len(validators); i++ {
validators[i] = &ethpb.Validator{
PublicKey: make([]byte, 32),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: true,
}
}

state, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, state.SetValidators(validators))
require.NoError(t, state.SetSlot(10))
require.NoError(t, state.SetLatestBlockHeader(util.HydrateBeaconHeader(&ethpb.BeaconBlockHeader{
Slot: 9,
ProposerIndex: 0,
})))

latestBlockSignedRoot, err := state.LatestBlockHeader().HashTreeRoot()
require.NoError(t, err)

currentEpoch := time.CurrentEpoch(state)
priv, err := bls.RandKey()
require.NoError(t, err)
pID, err := helpers.BeaconProposerIndex(context.Background(), state)
require.NoError(t, err)
block := util.NewBeaconBlock()
block.Block.Slot = 10
block.Block.ProposerIndex = pID
block.Block.Body.RandaoReveal = bytesutil.PadTo([]byte{'A', 'B', 'C'}, 96)
block.Block.ParentRoot = latestBlockSignedRoot[:]
block.Signature, err = signing.ComputeDomainAndSign(state, currentEpoch, block.Block, params.BeaconConfig().DomainBeaconProposer, priv)
require.NoError(t, err)
proposerIdx, err := helpers.BeaconProposerIndex(context.Background(), state)
require.NoError(t, err)
validators[proposerIdx].Slashed = false
validators[proposerIdx].PublicKey = priv.PublicKey().Marshal()
err = state.UpdateValidatorAtIndex(proposerIdx, validators[proposerIdx])
require.NoError(t, err)
set, err := blocks.BlockSignatureBatch(state, block.Block.ProposerIndex, block.Signature, block.Block.HashTreeRoot)
require.NoError(t, err)

verified, err := set.Verify()
require.NoError(t, err)
assert.Equal(t, true, verified, "Block signature set returned a set which was unable to be verified")
}
18 changes: 0 additions & 18 deletions beacon-chain/core/blocks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,6 @@ func VerifyBlockSignatureUsingCurrentFork(beaconState state.ReadOnlyBeaconState,
})
}

// BlockSignatureBatch retrieves the block signature batch from the provided block and its corresponding state.
func BlockSignatureBatch(beaconState state.ReadOnlyBeaconState,
proposerIndex primitives.ValidatorIndex,
sig []byte,
rootFunc func() ([32]byte, error)) (*bls.SignatureBatch, error) {
currentEpoch := slots.ToEpoch(beaconState.Slot())
domain, err := signing.Domain(beaconState.Fork(), currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return nil, err
}
proposer, err := beaconState.ValidatorAtIndex(proposerIndex)
if err != nil {
return nil, err
}
proposerPubKey := proposer.PublicKey
return signing.BlockSignatureBatch(proposerPubKey, sig, domain, rootFunc)
}

// RandaoSignatureBatch retrieves the relevant randao specific signature batch object
// from a block and its corresponding state.
func RandaoSignatureBatch(
Expand Down
17 changes: 17 additions & 0 deletions beacon-chain/core/chunks/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["signature.go"],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/chunks",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//network/forks:go_default_library",
"//time/slots:go_default_library",
],
)
53 changes: 53 additions & 0 deletions beacon-chain/core/chunks/signature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package chunks

import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

// VerifyChunkSignature verifies the proposer signature of a beacon block chunk.
func VerifyChunkSignature(beaconState state.ReadOnlyBeaconState,
proposerIndex primitives.ValidatorIndex,
sig []byte,
rootFunc func() ([32]byte, error)) error {
currentEpoch := slots.ToEpoch(beaconState.Slot())
domain, err := signing.Domain(beaconState.Fork(), currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(proposerIndex)
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
return signing.VerifyBlockSigningRoot(proposerPubKey, sig, domain, rootFunc)
}

// VerifyChunkSignatureUsingCurrentFork verifies the proposer signature of a beacon block chunk. This differs
// from the above method by not using fork data from the state and instead retrieving it
// via the respective epoch.
func VerifyChunkSignatureUsingCurrentFork(beaconState state.ReadOnlyBeaconState, chunk interfaces.ReadOnlyBeaconBlockChunk) error {
currentEpoch := slots.ToEpoch(chunk.Slot())
fork, err := forks.Fork(currentEpoch)
if err != nil {
return err
}
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(chunk.ProposerIndex())
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
sig := chunk.Signature()
return signing.VerifyBlockSigningRoot(proposerPubKey, sig[:], domain, func() ([32]byte, error) {
return chunk.HeaderRoot(), nil
})
}
8 changes: 1 addition & 7 deletions beacon-chain/core/transition/transition_no_verify_sig.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,6 @@ func ProcessBlockNoVerifyAnySig(
return nil, nil, err
}

sig := signed.Signature()
bSet, err := b.BlockSignatureBatch(st, blk.ProposerIndex(), sig[:], blk.HashTreeRoot)
if err != nil {
tracing.AnnotateError(span, err)
return nil, nil, errors.Wrap(err, "could not retrieve block signature set")
}
randaoReveal := signed.Block().Body().RandaoReveal()
rSet, err := b.RandaoSignatureBatch(ctx, st, randaoReveal[:])
if err != nil {
Expand All @@ -199,7 +193,7 @@ func ProcessBlockNoVerifyAnySig(

// Merge beacon block, randao and attestations signatures into a set.
set := bls.NewSet()
set.Join(bSet).Join(rSet).Join(aSet)
set.Join(rSet).Join(aSet)

if blk.Version() >= version.Capella {
changes, err := signed.Block().Body().BLSToExecutionChanges()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,8 @@ func TestProcessBlockNoVerify_SigSetContainsDescriptions(t *testing.T) {
set, _, err := transition.ProcessBlockNoVerifyAnySig(context.Background(), beaconState, wsb)
require.NoError(t, err)
assert.Equal(t, len(set.Signatures), len(set.Descriptions), "Signatures and descriptions do not match up")
assert.Equal(t, "block signature", set.Descriptions[0])
assert.Equal(t, "randao signature", set.Descriptions[1])
assert.Equal(t, "attestation signature", set.Descriptions[2])
assert.Equal(t, "randao signature", set.Descriptions[0])
assert.Equal(t, "attestation signature", set.Descriptions[1])
}

func TestProcessOperationsNoVerifyAttsSigs_OK(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//beacon-chain/sync/checkpoint:go_default_library",
"//beacon-chain/sync/genesis:go_default_library",
"//beacon-chain/sync/initial-sync:go_default_library",
"//beacon-chain/sync/rlnc:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
Expand Down
11 changes: 11 additions & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/genesis"
initialsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/rlnc"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
Expand Down Expand Up @@ -121,6 +122,7 @@ type BeaconNode struct {
BlobStorageOptions []filesystem.BlobStorageOption
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
chunkCommitter *rlnc.Committer
}

// New creates a new node instance, sets up configuration options, and registers
Expand All @@ -136,6 +138,11 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
registry := runtime.NewServiceRegistry()
ctx := cliCtx.Context

committer, err := rlnc.LoadTrustedSetup()
if err != nil {
return nil, errors.Wrap(err, "could not load the committer trusted setup")
}

beacon := &BeaconNode{
cliCtx: cliCtx,
ctx: ctx,
Expand All @@ -158,6 +165,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
serviceFlagOpts: &serviceFlagOpts{},
initialSyncComplete: make(chan struct{}),
syncChecker: &initialsync.SyncChecker{},
chunkCommitter: committer,
}

for _, opt := range opts {
Expand Down Expand Up @@ -751,6 +759,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithChunkCommitter(b.chunkCommitter),
blockchain.WithSyncChecker(b.syncChecker),
)

Expand Down Expand Up @@ -836,6 +845,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithChunkCommitter(b.chunkCommitter),
)
return b.services.RegisterService(rs)
}
Expand Down Expand Up @@ -982,6 +992,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
BlobStorage: b.BlobStorage,
TrackedValidatorsCache: b.trackedValidatorsCache,
PayloadIDCache: b.payloadIDCache,
ChunkCommitter: b.chunkCommitter,
})

return b.services.RegisterService(rpcService)
Expand Down
31 changes: 31 additions & 0 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
Expand Down Expand Up @@ -228,6 +229,36 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
return nil
}

// BroadcastBlockChunks sends the passed messages to the pubsub topic
func (s *Service) BroadcastBlockChunks(ctx context.Context, chunks []*ethpb.BeaconBlockChunk) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "could not retrieve fork digest")
tracing.AnnotateError(span, err)
return err
}
topic := RLNCTopicFormat
topic = fmt.Sprintf(topic, forkDigest)
multipleMessages := make([][]byte, len(chunks))
for i, c := range chunks {
buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, c); err != nil {
err := errors.Wrap(err, "could not encode message")
tracing.AnnotateError(span, err)
return err
}
multipleMessages[i] = buf.Bytes()
}
if err := s.PublishMultipleToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), multipleMessages, pubsub.WithRandomPublishing()); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err
}
return nil
}

func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/encoder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = [
"//beacon-chain:__subpackages__",
"//cmd:__subpackages__",
"//validator:__subpackages__",
],
deps = [
"//config/params:go_default_library",
Expand Down
Loading
Loading