Skip to content

Commit

Permalink
PeerDAS: Add KZG verification when sampling (#14187)
Browse files Browse the repository at this point in the history
* `validateDataColumn`: Add comments and remove debug computation.

* `sampleDataColumnsFromPeer`: Add KZG verification

* `VerifyKZGInclusionProofColumn`: Add unit test.

* Make deepsource happy.

* Address Nishant's comment.

* Address Nishant's comment.
  • Loading branch information
nalepae committed Jul 9, 2024
1 parent 08b3ccc commit 60652c4
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 136 deletions.
8 changes: 4 additions & 4 deletions beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func GetRandBlob(seed int64) kzg.Blob {
return blob
}

func GenerateCommitmentAndProof(blob kzg.Blob) (kzg.Commitment, kzg.Proof, error) {
func GenerateCommitmentAndProof(blob kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) {
commitment, err := kzg.BlobToKZGCommitment(&blob)
if err != nil {
return kzg.Commitment{}, kzg.Proof{}, err
return nil, nil, err
}
proof, err := kzg.ComputeBlobKZGProof(&blob, commitment)
if err != nil {
return kzg.Commitment{}, kzg.Proof{}, err
return nil, nil, err
}
return commitment, proof, err
return &commitment, &proof, err
}

func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ go_test(
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
Expand Down Expand Up @@ -258,6 +259,8 @@ go_test(
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_d4l3k_messagediff//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
Expand Down
227 changes: 137 additions & 90 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
Expand Down Expand Up @@ -77,8 +78,78 @@ func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) {
return custodyColumns, nil
}

// verifyColumn verifies the retrieved column against the root, the index,
// the KZG inclusion and the KZG proof.
func verifyColumn(
roDataColumn blocks.RODataColumn,
root [32]byte,
pid peer.ID,
requestedColumns map[uint64]bool,
) bool {
retrievedColumn := roDataColumn.ColumnIndex

// Filter out columns with incorrect root.
actualRoot := roDataColumn.BlockRoot()
if actualRoot != root {
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", root),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
}).Debug("Retrieved root does not match requested root")

return false
}

// Filter out columns that were not requested.
if !requestedColumns[retrievedColumn] {
columnsToSampleList := sortedSliceFromMap(requestedColumns)

log.WithFields(logrus.Fields{
"peerID": pid,
"requestedColumns": columnsToSampleList,
"retrievedColumn": retrievedColumn,
}).Debug("Retrieved column was not requested")

return false
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG inclusion proof for retrieved column")

return false
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar)
if err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Error when verifying KZG proof for retrieved column")

return false
}

if !verified {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG proof for retrieved column")

return false
}

return true
}

// sampleDataColumnsFromPeer samples data columns from a peer.
// It filters out columns that were not requested and columns with incorrect root.
// It returns the retrieved columns.
func (s *Service) sampleDataColumnsFromPeer(
pid peer.ID,
Expand All @@ -102,39 +173,10 @@ func (s *Service) sampleDataColumnsFromPeer(

retrievedColumns := make(map[uint64]bool, len(roDataColumns))

// Remove retrieved items from rootsByDataColumnIndex.
for _, roDataColumn := range roDataColumns {
retrievedColumn := roDataColumn.ColumnIndex

actualRoot := roDataColumn.BlockRoot()

// Filter out columns with incorrect root.
if actualRoot != root {
// TODO: Should we decrease the peer score here?
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", root),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
}).Warning("Actual root does not match requested root")

continue
}

// Filter out columns that were not requested.
if !requestedColumns[retrievedColumn] {
// TODO: Should we decrease the peer score here?
columnsToSampleList := sortedSliceFromMap(requestedColumns)

log.WithFields(logrus.Fields{
"peerID": pid,
"requestedColumns": columnsToSampleList,
"retrievedColumn": retrievedColumn,
}).Warning("Retrieved column was not requested")

continue
if verifyColumn(roDataColumn, root, pid, requestedColumns) {
retrievedColumns[roDataColumn.ColumnIndex] = true
}

retrievedColumns[retrievedColumn] = true
}

if len(retrievedColumns) == len(requestedColumns) {
Expand Down Expand Up @@ -337,73 +379,78 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) {
for {
select {
case e := <-stateChannel:
if e.Type != statefeed.BlockProcessed {
continue
}
s.processEvent(e, nonCustodyColums, samplesCount)

data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
continue
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return

if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
continue
}
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
}
}

if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
continue
}
if coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
continue
}
func (s *Service) processEvent(e *feed.Event, nonCustodyColums map[uint64]bool, samplesCount uint64) {
if e.Type != statefeed.BlockProcessed {
return
}

// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
continue
}
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
return
}

// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
continue
}
if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
return
}

// Ramdomize all columns.
randomizedColumns := randomizeColumns(nonCustodyColums)
if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
return
}

// Sample data columns with incremental DAS.
ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Error during incremental DAS")
}
if coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
return
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Warning("Data column sampling failed")
}
// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
return
}

case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
return
}

case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
// Ramdomize all columns.
randomizedColumns := randomizeColumns(nonCustodyColums)

// Sample data columns with incremental DAS.
ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Error during incremental DAS")
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Warning("Data column sampling failed")
}
}
Loading

0 comments on commit 60652c4

Please sign in to comment.