Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Add KZG verification when sampling #14187

Merged
merged 7 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func GetRandBlob(seed int64) kzg.Blob {
return blob
}

func GenerateCommitmentAndProof(blob kzg.Blob) (kzg.Commitment, kzg.Proof, error) {
func GenerateCommitmentAndProof(blob kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) {
commitment, err := kzg.BlobToKZGCommitment(&blob)
if err != nil {
return kzg.Commitment{}, kzg.Proof{}, err
return nil, nil, err
}
proof, err := kzg.ComputeBlobKZGProof(&blob, commitment)
if err != nil {
return kzg.Commitment{}, kzg.Proof{}, err
return nil, nil, err
}
return commitment, proof, err
return &commitment, &proof, err
}

func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ go_test(
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
Expand Down Expand Up @@ -258,6 +259,8 @@ go_test(
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_d4l3k_messagediff//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
Expand Down
227 changes: 137 additions & 90 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
Expand Down Expand Up @@ -77,8 +78,78 @@ func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) {
return custodyColumns, nil
}

// verifyColumn verifies the retrieved column against the root, the index,
// the KZG inclusion and the KZG proof.
func verifyColumn(
roDataColumn blocks.RODataColumn,
root [32]byte,
pid peer.ID,
requestedColumns map[uint64]bool,
) bool {
retrievedColumn := roDataColumn.ColumnIndex

// Filter out columns with incorrect root.
actualRoot := roDataColumn.BlockRoot()
if actualRoot != root {
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", root),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
}).Debug("Retrieved root does not match requested root")

return false
}

// Filter out columns that were not requested.
if !requestedColumns[retrievedColumn] {
columnsToSampleList := sortedSliceFromMap(requestedColumns)

log.WithFields(logrus.Fields{
"peerID": pid,
"requestedColumns": columnsToSampleList,
"retrievedColumn": retrievedColumn,
}).Debug("Retrieved column was not requested")

return false
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG inclusion proof for retrieved column")

return false
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar)
if err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Error when verifying KZG proof for retrieved column")

return false
}

if !verified {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG proof for retrieved column")

return false
}

return true
}

// sampleDataColumnsFromPeer samples data columns from a peer.
// It filters out columns that were not requested and columns with incorrect root.
// It returns the retrieved columns.
func (s *Service) sampleDataColumnsFromPeer(
pid peer.ID,
Expand All @@ -102,39 +173,10 @@ func (s *Service) sampleDataColumnsFromPeer(

retrievedColumns := make(map[uint64]bool, len(roDataColumns))

// Remove retrieved items from rootsByDataColumnIndex.
for _, roDataColumn := range roDataColumns {
retrievedColumn := roDataColumn.ColumnIndex

actualRoot := roDataColumn.BlockRoot()

// Filter out columns with incorrect root.
if actualRoot != root {
// TODO: Should we decrease the peer score here?
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", root),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
}).Warning("Actual root does not match requested root")

continue
}

// Filter out columns that were not requested.
if !requestedColumns[retrievedColumn] {
// TODO: Should we decrease the peer score here?
columnsToSampleList := sortedSliceFromMap(requestedColumns)

log.WithFields(logrus.Fields{
"peerID": pid,
"requestedColumns": columnsToSampleList,
"retrievedColumn": retrievedColumn,
}).Warning("Retrieved column was not requested")

continue
if verifyColumn(roDataColumn, root, pid, requestedColumns) {
retrievedColumns[roDataColumn.ColumnIndex] = true
}

retrievedColumns[retrievedColumn] = true
}

if len(retrievedColumns) == len(requestedColumns) {
Expand Down Expand Up @@ -337,73 +379,78 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) {
for {
select {
case e := <-stateChannel:
if e.Type != statefeed.BlockProcessed {
continue
}
s.processEvent(e, nonCustodyColums, samplesCount)

data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
continue
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return

if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
continue
}
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
}
}

if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
continue
}
if coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
continue
}
func (s *Service) processEvent(e *feed.Event, nonCustodyColums map[uint64]bool, samplesCount uint64) {
if e.Type != statefeed.BlockProcessed {
return
}

// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
continue
}
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
return
}

// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
continue
}
if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
return
}

// Ramdomize all columns.
randomizedColumns := randomizeColumns(nonCustodyColums)
if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
return
}

// Sample data columns with incremental DAS.
ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Error during incremental DAS")
}
if coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
return
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Warning("Data column sampling failed")
}
// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
return
}

case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
return
}

case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
// Ramdomize all columns.
randomizedColumns := randomizeColumns(nonCustodyColums)

// Sample data columns with incremental DAS.
ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Error during incremental DAS")
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Warning("Data column sampling failed")
}
}
Loading
Loading