From a8a0c1b36daa6f19c6ca614da1f44b96e83d948e Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Sun, 23 Jun 2024 22:57:08 +0200 Subject: [PATCH 1/6] `validateDataColumn`: Add comments and remove debug computation. --- beacon-chain/sync/validate_data_column.go | 66 +++++++++++++++++------ 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index 3eb53549f2b9..62e344d04772 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -23,88 +23,121 @@ import ( "github.com/sirupsen/logrus" ) +// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { receivedTime := prysmTime.Now() + // Always accept messages our own messages. if pid == s.cfg.p2p.PeerID() { return pubsub.ValidationAccept, nil } + + // Ignore messages during initial sync. if s.cfg.initialSync.Syncing() { return pubsub.ValidationIgnore, nil } + + // Ignore message with a nil topic. if msg.Topic == nil { return pubsub.ValidationReject, errInvalidTopic } + + // Decode the message. m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") return pubsub.ValidationReject, err } + // Ignore messages that are not of the expected type. ds, ok := m.(*eth.DataColumnSidecar) if !ok { log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar") return pubsub.ValidationReject, errWrongMessage } + + // [REJECT] The sidecar's index is consistent with NUMBER_OF_COLUMNS -- i.e. sidecar.index < NUMBER_OF_COLUMNS. if ds.ColumnIndex >= params.BeaconConfig().NumberOfColumns { return pubsub.ValidationReject, errors.Errorf("invalid column index provided, got %d", ds.ColumnIndex) } + + // [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id. want := fmt.Sprintf("data_column_sidecar_%d", computeSubnetForColumnSidecar(ds.ColumnIndex)) if !strings.Contains(*msg.Topic, want) { log.Debug("Column Sidecar index does not match topic") return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic) } + + // [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. validate that block_header.slot <= current_slot (a client MAY queue future sidecars for processing at the appropriate slot). if err := slots.VerifyTime(uint64(s.cfg.clock.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot, params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil { log.WithError(err).Debug("Ignored sidecar: could not verify slot time") return pubsub.ValidationIgnore, nil } + + // [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- i.e. validate that block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch) cp := s.cfg.chain.FinalizedCheckpt() startSlot, err := slots.EpochStart(cp.Epoch) if err != nil { log.WithError(err).Debug("Ignored column sidecar: could not calculate epoch start slot") return pubsub.ValidationIgnore, nil } + if startSlot >= ds.SignedBlockHeader.Header.Slot { err := fmt.Errorf("finalized slot %d greater or equal to block slot %d", startSlot, ds.SignedBlockHeader.Header.Slot) log.Debug(err) return pubsub.ValidationIgnore, err } - // Handle sidecar when the parent is unknown. + + // [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved). if !s.cfg.chain.HasBlock(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { err := errors.Errorf("unknown parent for data column sidecar with slot %d and parent root %#x", ds.SignedBlockHeader.Header.Slot, ds.SignedBlockHeader.Header.ParentRoot) log.WithError(err).Debug("Could not identify parent for data column sidecar") return pubsub.ValidationIgnore, err } + + // [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation. if s.hasBadBlock([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { bRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot() if err != nil { return pubsub.ValidationIgnore, err } + + // If parent is bad, we set the block as bad. s.setBadBlock(ctx, bRoot) return pubsub.ValidationReject, errors.Errorf("column sidecar with bad parent provided") } + + // [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by block_header.parent_root). parentSlot, err := s.cfg.chain.RecentBlockSlot([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) if err != nil { return pubsub.ValidationIgnore, err } + if ds.SignedBlockHeader.Header.Slot <= parentSlot { return pubsub.ValidationReject, errors.Errorf("invalid column sidecar slot: %d", ds.SignedBlockHeader.Header.Slot) } + + // [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root. if !s.cfg.chain.InForkchoice([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { return pubsub.ValidationReject, blockchain.ErrNotDescendantOfFinalized } + // [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by verify_data_column_sidecar_inclusion_proof(sidecar). if err := blocks.VerifyKZGInclusionProofColumn(ds); err != nil { return pubsub.ValidationReject, err } + // [REJECT] The sidecar's column data is valid as verified by verify_data_column_sidecar_kzg_proofs(sidecar). verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(ds) if err != nil { return pubsub.ValidationReject, err } + if !verified { return pubsub.ValidationReject, errors.New("failed to verify kzg proof of column") } + + // ????? parentState, err := s.cfg.stateGen.StateByRoot(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) if err != nil { return pubsub.ValidationIgnore, err @@ -113,35 +146,38 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs if err := coreBlocks.VerifyBlockHeaderSignatureUsingCurrentFork(parentState, ds.SignedBlockHeader); err != nil { return pubsub.ValidationReject, err } - // In the event the block is more than an epoch ahead from its - // parent state, we have to advance the state forward. + + // [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey. parentRoot := ds.SignedBlockHeader.Header.ParentRoot parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot, ds.SignedBlockHeader.Header.Slot) if err != nil { return pubsub.ValidationIgnore, err } + idx, err := helpers.BeaconProposerIndex(ctx, parentState) if err != nil { return pubsub.ValidationIgnore, err } + if ds.SignedBlockHeader.Header.ProposerIndex != idx { return pubsub.ValidationReject, errors.New("incorrect proposer index") } - startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot) - if err != nil { - return pubsub.ValidationIgnore, err + // Add specific debug log. + if logrus.GetLevel() >= logrus.DebugLevel { + // Get the time at slot start. + startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot) + if err == nil { + log.WithFields(logrus.Fields{ + "sinceSlotStartTime": receivedTime.Sub(startTime), + "validationTime": s.cfg.clock.Now().Sub(receivedTime), + "columnIndex": ds.ColumnIndex, + }).Debug("Received data column sidecar") + } else { + log.WithError(err).Error("Failed to calculate slot time") + } } - sinceSlotStartTime := receivedTime.Sub(startTime) - validationTime := s.cfg.clock.Now().Sub(receivedTime) - - log.WithFields(logrus.Fields{ - "sinceSlotStartTime": sinceSlotStartTime, - "validationTime": validationTime, - "columnIndex": ds.ColumnIndex, - }).Debug("Received data column sidecar") - // TODO: Transform this whole function so it looks like to the `validateBlob` // with the tiny verifiers inside. roDataColumn, err := blocks.NewRODataColumn(ds) From 079b5c21b2182cc46d4b927df1987d7117968cfd Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Sun, 23 Jun 2024 22:57:45 +0200 Subject: [PATCH 2/6] `sampleDataColumnsFromPeer`: Add KZG verification --- beacon-chain/sync/BUILD.bazel | 3 + beacon-chain/sync/data_columns_sampling.go | 106 +++++++++++----- .../sync/data_columns_sampling_test.go | 118 +++++++++++++----- 3 files changed, 167 insertions(+), 60 deletions(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 28f7a7ece7ab..0daa3a7e9335 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -200,6 +200,7 @@ go_test( deps = [ "//async/abool:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/blockchain/kzg:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/core/altair:go_default_library", @@ -257,6 +258,8 @@ go_test( "//testing/util:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", + "@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library", + "@com_github_crate_crypto_go_kzg_4844//:go_default_library", "@com_github_d4l3k_messagediff//:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//core/types:go_default_library", diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 156d608289d1..daef98d29957 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/crypto/rand" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" @@ -76,8 +77,78 @@ func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) { return custodyColumns, nil } +// verifyColumn verifies the retrieved column against the root, the index, +// the KZG inclusion and the KZG proof. +func verifyColumn( + roDataColumn blocks.RODataColumn, + root [32]byte, + pid peer.ID, + requestedColumns map[uint64]bool, +) bool { + retrievedColumn := roDataColumn.ColumnIndex + + // Filter out columns with incorrect root. + actualRoot := roDataColumn.BlockRoot() + if actualRoot != root { + log.WithFields(logrus.Fields{ + "peerID": pid, + "requestedRoot": fmt.Sprintf("%#x", root), + "actualRoot": fmt.Sprintf("%#x", actualRoot), + }).Debug("Retrieved root does not match requested root") + + return false + } + + // Filter out columns that were not requested. + if !requestedColumns[retrievedColumn] { + columnsToSampleList := sortedSliceFromMap(requestedColumns) + + log.WithFields(logrus.Fields{ + "peerID": pid, + "requestedColumns": columnsToSampleList, + "retrievedColumn": retrievedColumn, + }).Debug("Retrieved column was not requested") + + return false + } + + // Filter out columns which did not pass the KZG inclusion proof verification. + if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Failed to verify KZG inclusion proof for retrieved column") + + return false + } + + // Filter out columns which did not pass the KZG proof verification. + verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar) + if err != nil { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Error when verifying KZG proof for retrieved column") + + return false + } + + if !verified { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Failed to verify KZG proof for retrieved column") + + return false + } + + return true +} + // sampleDataColumnsFromPeer samples data columns from a peer. -// It filters out columns that were not requested and columns with incorrect root. // It returns the retrieved columns. func (s *Service) sampleDataColumnsFromPeer( pid peer.ID, @@ -101,39 +172,10 @@ func (s *Service) sampleDataColumnsFromPeer( retrievedColumns := make(map[uint64]bool, len(roDataColumns)) - // Remove retrieved items from rootsByDataColumnIndex. for _, roDataColumn := range roDataColumns { - retrievedColumn := roDataColumn.ColumnIndex - - actualRoot := roDataColumn.BlockRoot() - - // Filter out columns with incorrect root. - if actualRoot != root { - // TODO: Should we decrease the peer score here? - log.WithFields(logrus.Fields{ - "peerID": pid, - "requestedRoot": fmt.Sprintf("%#x", root), - "actualRoot": fmt.Sprintf("%#x", actualRoot), - }).Warning("Actual root does not match requested root") - - continue + if verified := verifyColumn(roDataColumn, root, pid, requestedColumns); verified { + retrievedColumns[roDataColumn.ColumnIndex] = true } - - // Filter out columns that were not requested. - if !requestedColumns[retrievedColumn] { - // TODO: Should we decrease the peer score here? - columnsToSampleList := sortedSliceFromMap(requestedColumns) - - log.WithFields(logrus.Fields{ - "peerID": pid, - "requestedColumns": columnsToSampleList, - "retrievedColumn": retrievedColumn, - }).Warning("Retrieved column was not requested") - - continue - } - - retrievedColumns[retrievedColumn] = true } if len(retrievedColumns) == len(requestedColumns) { diff --git a/beacon-chain/sync/data_columns_sampling_test.go b/beacon-chain/sync/data_columns_sampling_test.go index 436cb3068d1b..d818ebd639fb 100644 --- a/beacon-chain/sync/data_columns_sampling_test.go +++ b/beacon-chain/sync/data_columns_sampling_test.go @@ -1,22 +1,30 @@ package sync import ( + "bytes" "context" + "crypto/sha256" + "encoding/binary" "testing" + "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" + GoKZG "github.com/crate-crypto/go-kzg-4844" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + kzg "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util" + "github.com/sirupsen/logrus" ) func TestRandomizeColumns(t *testing.T) { @@ -52,17 +60,11 @@ func createAndConnectPeer( t *testing.T, p2pService *p2ptest.TestP2P, chainService *mock.ChainService, - header *ethpb.BeaconBlockHeader, + dataColumnSidecars []*ethpb.DataColumnSidecar, custodyCount uint64, columnsNotToRespond map[uint64]bool, offset int, ) { - emptyRoot := [fieldparams.RootLength]byte{} - emptySignature := [fieldparams.BLSSignatureLength]byte{} - emptyKzgCommitmentInclusionProof := [4][]byte{ - emptyRoot[:], emptyRoot[:], emptyRoot[:], emptyRoot[:], - } - // Create the private key, depending on the offset. privateKeyBytes := make([]byte, 32) for i := 0; i < 32; i++ { @@ -89,17 +91,10 @@ func createAndConnectPeer( } // Create the response. - resp := ethpb.DataColumnSidecar{ - ColumnIndex: identifier.ColumnIndex, - SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ - Header: header, - Signature: emptySignature[:], - }, - KzgCommitmentsInclusionProof: emptyKzgCommitmentInclusionProof[:], - } + resp := dataColumnSidecars[identifier.ColumnIndex] // Send the response. - err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), &resp) + err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), resp) require.NoError(t, err) } @@ -117,17 +112,84 @@ func createAndConnectPeer( p2pService.Connect(peer) } +func deterministicRandomness(seed int64) [32]byte { + // Converts an int64 to a byte slice + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, seed) + if err != nil { + logrus.WithError(err).Error("Failed to write int64 to bytes buffer") + return [32]byte{} + } + bytes := buf.Bytes() + + return sha256.Sum256(bytes) +} + +// Returns a serialized random field element in big-endian +func getRandFieldElement(seed int64) [32]byte { + bytes := deterministicRandomness(seed) + var r fr.Element + r.SetBytes(bytes[:]) + + return GoKZG.SerializeScalar(r) +} + +// Returns a random blob using the passed seed as entropy +func getRandBlob(seed int64) kzg.Blob { + var blob kzg.Blob + for i := 0; i < len(blob); i += 32 { + fieldElementBytes := getRandFieldElement(seed + int64(i)) + copy(blob[i:i+32], fieldElementBytes[:]) + } + return blob +} + +func generateCommitmentAndProof(blob kzg.Blob) (kzg.Commitment, kzg.Proof, error) { + commitment, err := kzg.BlobToKZGCommitment(&blob) + if err != nil { + return kzg.Commitment{}, kzg.Proof{}, err + } + proof, err := kzg.ComputeBlobKZGProof(&blob, commitment) + if err != nil { + return kzg.Commitment{}, kzg.Proof{}, err + } + return commitment, proof, err +} + func TestIncrementalDAS(t *testing.T) { - const custodyRequirement uint64 = 1 + const ( + blobCount = 3 + custodyRequirement uint64 = 1 + ) + + err := kzg.Start() + require.NoError(t, err) + + // Generate random blobs, commitments and inclusion proofs. + blobs := make([]kzg.Blob, blobCount) + kzgCommitments := make([][]byte, blobCount) + kzgProofs := make([][]byte, blobCount) - emptyRoot := [fieldparams.RootLength]byte{} - emptyHeader := ðpb.BeaconBlockHeader{ - ParentRoot: emptyRoot[:], - StateRoot: emptyRoot[:], - BodyRoot: emptyRoot[:], + for i := int64(0); i < blobCount; i++ { + blob := getRandBlob(int64(i)) + + kzgCommitment, kzgProof, err := generateCommitmentAndProof(blob) + require.NoError(t, err) + + blobs[i] = blob + kzgCommitments[i] = kzgCommitment[:] + kzgProofs[i] = kzgProof[:] } - emptyHeaderRoot, err := emptyHeader.HashTreeRoot() + dbBlock := util.NewBeaconBlockDeneb() + dbBlock.Block.Body.BlobKzgCommitments = kzgCommitments + sBlock, err := blocks.NewSignedBeaconBlock(dbBlock) + require.NoError(t, err) + + dataColumnSidecars, err := peerdas.DataColumnSidecars(sBlock, blobs) + require.NoError(t, err) + + blockRoot, err := dataColumnSidecars[0].GetSignedBlockHeader().Header.HashTreeRoot() require.NoError(t, err) testCases := []struct { @@ -198,13 +260,13 @@ func TestIncrementalDAS(t *testing.T) { chainService, clock := defaultMockChain(t) // Custody columns: [6, 38, 70, 102] - createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 1) + createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 1) // Custody columns: [3, 35, 67, 99] - createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 2) + createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 2) // Custody columns: [12, 44, 76, 108] - createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 3) + createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 3) service := &Service{ cfg: &config{ @@ -215,7 +277,7 @@ func TestIncrementalDAS(t *testing.T) { ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb}, } - actualSuccess, actualRoundSummaries, err := service.incrementalDAS(emptyHeaderRoot, tc.possibleColumnsToRequest, tc.samplesCount) + actualSuccess, actualRoundSummaries, err := service.incrementalDAS(blockRoot, tc.possibleColumnsToRequest, tc.samplesCount) require.NoError(t, err) require.Equal(t, tc.expectedSuccess, actualSuccess) From 2b534186245374025ba08a6117ce1203a60db136 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Jul 2024 14:43:45 +0200 Subject: [PATCH 3/6] `VerifyKZGInclusionProofColumn`: Add unit test. --- consensus-types/blocks/kzg_test.go | 116 +++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/consensus-types/blocks/kzg_test.go b/consensus-types/blocks/kzg_test.go index 8bc6c5498315..e0fb3e8557ee 100644 --- a/consensus-types/blocks/kzg_test.go +++ b/consensus-types/blocks/kzg_test.go @@ -259,3 +259,119 @@ func Test_VerifyKZGInclusionProof(t *testing.T) { proof[2] = make([]byte, 32) require.ErrorIs(t, errInvalidInclusionProof, VerifyKZGInclusionProof(blob)) } + +func Test_VerifyKZGInclusionProofColumn(t *testing.T) { + const ( + blobCount = 3 + columnIndex = 0 + ) + + // Generate random KZG commitments `blobCount` blobs. + kzgCommitments := make([][]byte, blobCount) + + for i := 0; i < blobCount; i++ { + kzgCommitments[i] = make([]byte, 48) + _, err := rand.Read(kzgCommitments[i]) + require.NoError(t, err) + } + + pbBody := ðpb.BeaconBlockBodyDeneb{ + RandaoReveal: make([]byte, 96), + Eth1Data: ðpb.Eth1Data{ + DepositRoot: make([]byte, fieldparams.RootLength), + BlockHash: make([]byte, fieldparams.RootLength), + }, + Graffiti: make([]byte, 32), + SyncAggregate: ðpb.SyncAggregate{ + SyncCommitteeBits: make([]byte, fieldparams.SyncAggregateSyncCommitteeBytesLength), + SyncCommitteeSignature: make([]byte, fieldparams.BLSSignatureLength), + }, + ExecutionPayload: &enginev1.ExecutionPayloadDeneb{ + ParentHash: make([]byte, fieldparams.RootLength), + FeeRecipient: make([]byte, 20), + StateRoot: make([]byte, fieldparams.RootLength), + ReceiptsRoot: make([]byte, fieldparams.RootLength), + LogsBloom: make([]byte, 256), + PrevRandao: make([]byte, fieldparams.RootLength), + BaseFeePerGas: make([]byte, fieldparams.RootLength), + BlockHash: make([]byte, fieldparams.RootLength), + Transactions: make([][]byte, 0), + ExtraData: make([]byte, 0), + }, + BlobKzgCommitments: kzgCommitments, + } + + root, err := pbBody.HashTreeRoot() + require.NoError(t, err) + + body, err := NewBeaconBlockBody(pbBody) + require.NoError(t, err) + + kzgCommitmentsInclusionProof, err := MerkleProofKZGCommitments(body) + require.NoError(t, err) + + testCases := []struct { + name string + expectedError error + dataColumnSidecar *ethpb.DataColumnSidecar + }{ + { + name: "nilSignedBlockHeader", + expectedError: errNilBlockHeader, + dataColumnSidecar: ðpb.DataColumnSidecar{}, + }, + { + name: "nilHeader", + expectedError: errNilBlockHeader, + dataColumnSidecar: ðpb.DataColumnSidecar{ + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{}, + }, + }, + { + name: "invalidBodyRoot", + expectedError: errInvalidBodyRoot, + dataColumnSidecar: ðpb.DataColumnSidecar{ + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{}, + }, + }, + }, + { + name: "unverifiedMerkleProof", + expectedError: errInvalidInclusionProof, + dataColumnSidecar: ðpb.DataColumnSidecar{ + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + BodyRoot: make([]byte, 32), + }, + }, + KzgCommitments: kzgCommitments, + }, + }, + { + name: "nominal", + expectedError: nil, + dataColumnSidecar: ðpb.DataColumnSidecar{ + KzgCommitments: kzgCommitments, + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + BodyRoot: root[:], + }, + }, + KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := VerifyKZGInclusionProofColumn(tc.dataColumnSidecar) + if tc.expectedError == nil { + require.NoError(t, err) + return + } + + require.ErrorIs(t, tc.expectedError, err) + }) + } +} From 330a87e2ffe3f05e3425418da39a090fff57f6c5 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 4 Jul 2024 23:10:47 +0200 Subject: [PATCH 4/6] Make deepsource happy. --- beacon-chain/core/peerdas/helpers_test.go | 8 +- beacon-chain/sync/data_columns_sampling.go | 114 +++++++++--------- .../sync/data_columns_sampling_test.go | 14 +-- 3 files changed, 70 insertions(+), 66 deletions(-) diff --git a/beacon-chain/core/peerdas/helpers_test.go b/beacon-chain/core/peerdas/helpers_test.go index 73572db92879..eb934a7b6c39 100644 --- a/beacon-chain/core/peerdas/helpers_test.go +++ b/beacon-chain/core/peerdas/helpers_test.go @@ -50,16 +50,16 @@ func GetRandBlob(seed int64) kzg.Blob { return blob } -func GenerateCommitmentAndProof(blob kzg.Blob) (kzg.Commitment, kzg.Proof, error) { +func GenerateCommitmentAndProof(blob kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { commitment, err := kzg.BlobToKZGCommitment(&blob) if err != nil { - return kzg.Commitment{}, kzg.Proof{}, err + return nil, nil, err } proof, err := kzg.ComputeBlobKZGProof(&blob, commitment) if err != nil { - return kzg.Commitment{}, kzg.Proof{}, err + return nil, nil, err } - return commitment, proof, err + return &commitment, &proof, err } func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) { diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index daef98d29957..46913b3d2a0b 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -173,7 +173,7 @@ func (s *Service) sampleDataColumnsFromPeer( retrievedColumns := make(map[uint64]bool, len(roDataColumns)) for _, roDataColumn := range roDataColumns { - if verified := verifyColumn(roDataColumn, root, pid, requestedColumns); verified { + if verifyColumn(roDataColumn, root, pid, requestedColumns) { retrievedColumns[roDataColumn.ColumnIndex] = true } } @@ -378,69 +378,73 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { for { select { case e := <-stateChannel: - if e.Type != statefeed.BlockProcessed { - continue - } + s.processEvent(e, nonCustodyColums, samplesCount) - data, ok := e.Data.(*statefeed.BlockProcessedData) - if !ok { - log.Error("Event feed data is not of type *statefeed.BlockProcessedData") - continue - } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return - if !data.Verified { - // We only process blocks that have been verified - log.Error("Data is not verified") - continue - } + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state feed failed") + } + } +} - if data.SignedBlock.Version() < version.Deneb { - log.Debug("Pre Deneb block, skipping data column sampling") - continue - } +func (s *Service) processEvent(e *feed.Event, nonCustodyColums map[uint64]bool, samplesCount uint64) { + if e.Type != statefeed.BlockProcessed { + return + } - // Get the commitments for this block. - commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() - if err != nil { - log.WithError(err).Error("Failed to get blob KZG commitments") - continue - } + data, ok := e.Data.(*statefeed.BlockProcessedData) + if !ok { + log.Error("Event feed data is not of type *statefeed.BlockProcessedData") + return + } - // Skip if there are no commitments. - if len(commitments) == 0 { - log.Debug("No commitments in block, skipping data column sampling") - continue - } + if !data.Verified { + // We only process blocks that have been verified + log.Error("Data is not verified") + return + } - // Ramdomize all columns. - randomizedColumns := randomizeColumns(nonCustodyColums) + if data.SignedBlock.Version() < version.Deneb { + log.Debug("Pre Deneb block, skipping data column sampling") + return + } - // Sample data columns with incremental DAS. - ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount) - if err != nil { - log.WithError(err).Error("Error during incremental DAS") - } + // Get the commitments for this block. + commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() + if err != nil { + log.WithError(err).Error("Failed to get blob KZG commitments") + return + } - if ok { - log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - "sampleCount": samplesCount, - }).Debug("Data column sampling successful") - } else { - log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - "sampleCount": samplesCount, - }).Warning("Data column sampling failed") - } + // Skip if there are no commitments. + if len(commitments) == 0 { + log.Debug("No commitments in block, skipping data column sampling") + return + } - case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return + // Ramdomize all columns. + randomizedColumns := randomizeColumns(nonCustodyColums) - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state feed failed") - } + // Sample data columns with incremental DAS. + ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount) + if err != nil { + log.WithError(err).Error("Error during incremental DAS") + } + + if ok { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Debug("Data column sampling successful") + } else { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Warning("Data column sampling failed") } } diff --git a/beacon-chain/sync/data_columns_sampling_test.go b/beacon-chain/sync/data_columns_sampling_test.go index d818ebd639fb..af3526fdaf1a 100644 --- a/beacon-chain/sync/data_columns_sampling_test.go +++ b/beacon-chain/sync/data_columns_sampling_test.go @@ -144,16 +144,16 @@ func getRandBlob(seed int64) kzg.Blob { return blob } -func generateCommitmentAndProof(blob kzg.Blob) (kzg.Commitment, kzg.Proof, error) { - commitment, err := kzg.BlobToKZGCommitment(&blob) +func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { + commitment, err := kzg.BlobToKZGCommitment(blob) if err != nil { - return kzg.Commitment{}, kzg.Proof{}, err + return nil, nil, err } - proof, err := kzg.ComputeBlobKZGProof(&blob, commitment) + proof, err := kzg.ComputeBlobKZGProof(blob, commitment) if err != nil { - return kzg.Commitment{}, kzg.Proof{}, err + return nil, nil, err } - return commitment, proof, err + return &commitment, &proof, err } func TestIncrementalDAS(t *testing.T) { @@ -173,7 +173,7 @@ func TestIncrementalDAS(t *testing.T) { for i := int64(0); i < blobCount; i++ { blob := getRandBlob(int64(i)) - kzgCommitment, kzgProof, err := generateCommitmentAndProof(blob) + kzgCommitment, kzgProof, err := generateCommitmentAndProof(&blob) require.NoError(t, err) blobs[i] = blob From b4ad9143819ddc18f7bc938102e22c7f39c9f2bf Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 8 Jul 2024 11:03:27 +0200 Subject: [PATCH 5/6] Address Nishant's comment. --- beacon-chain/sync/validate_data_column.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index 62e344d04772..f6f4a77c1e9c 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -163,19 +163,18 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject, errors.New("incorrect proposer index") } + // Get the time at slot start. + startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot) + // Add specific debug log. - if logrus.GetLevel() >= logrus.DebugLevel { - // Get the time at slot start. - startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot) - if err == nil { - log.WithFields(logrus.Fields{ - "sinceSlotStartTime": receivedTime.Sub(startTime), - "validationTime": s.cfg.clock.Now().Sub(receivedTime), - "columnIndex": ds.ColumnIndex, - }).Debug("Received data column sidecar") - } else { - log.WithError(err).Error("Failed to calculate slot time") - } + if err == nil { + log.WithFields(logrus.Fields{ + "sinceSlotStartTime": receivedTime.Sub(startTime), + "validationTime": s.cfg.clock.Now().Sub(receivedTime), + "columnIndex": ds.ColumnIndex, + }).Debug("Received data column sidecar") + } else { + log.WithError(err).Error("Failed to calculate slot time") } // TODO: Transform this whole function so it looks like to the `validateBlob` From 6431659ef2897c05050697b7112901dce836e965 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 8 Jul 2024 11:20:16 +0200 Subject: [PATCH 6/6] Address Nishant's comment. --- beacon-chain/sync/validate_data_column.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index f6f4a77c1e9c..b9e1ae48168d 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -137,7 +137,7 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject, errors.New("failed to verify kzg proof of column") } - // ????? + // [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey. parentState, err := s.cfg.stateGen.StateByRoot(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) if err != nil { return pubsub.ValidationIgnore, err @@ -147,7 +147,6 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject, err } - // [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey. parentRoot := ds.SignedBlockHeader.Header.ParentRoot parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot, ds.SignedBlockHeader.Header.Slot) if err != nil {