Skip to content

Commit

Permalink
Refactor batch verifier for sharing across packages (#13812)
Browse files Browse the repository at this point in the history
* refactor batch verifier to share with pending queue

* unit test for batch verifier

---------

Co-authored-by: Kasey Kirkham <[email protected]>
(cherry picked from commit cdd1d81)
  • Loading branch information
kasey authored and prestonvanloon committed Mar 27, 2024
1 parent 3d22302 commit b3053dc
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 32 deletions.
3 changes: 0 additions & 3 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ go_library(
"log.go",
"round_robin.go",
"service.go",
"verification.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/transition:go_default_library",
Expand All @@ -41,7 +39,6 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *Service) processFetchedDataRegSync(
if len(bwb) == 0 {
return
}
bv := newBlobBatchVerifier(s.newBlobVerifier)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -326,7 +327,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}

bv := newBlobBatchVerifier(s.newBlobVerifier)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
Expand Down
8 changes: 7 additions & 1 deletion beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if len(sidecars) != len(req) {
continue
}
bv := newBlobBatchVerifier(s.newBlobVerifier)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
Expand All @@ -362,3 +362,9 @@ func shufflePeers(pids []peer.ID) {
pids[i], pids[j] = pids[j], pids[i]
})
}

func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
if len(sidecars) != len(request) {
return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request))
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueSidecarRequirements)
for _, sidecar := range sidecars {
if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil {
return err
}
log.WithFields(blobFields(sidecar)).Debug("Received blob sidecar RPC")
}

vscs, err := verification.BlobSidecarSliceNoop(sidecars)
vscs, err := bv.VerifiedROBlobs(ctx, RoBlock, sidecars)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"batch.go",
"blob.go",
"cache.go",
"error.go",
Expand Down Expand Up @@ -45,6 +46,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"batch_test.go",
"blob_test.go",
"cache_test.go",
"initializer_test.go",
Expand All @@ -69,5 +71,6 @@ go_test(
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package initialsync
package verification

import (
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
)
Expand All @@ -20,40 +18,35 @@ var (
ErrBatchBlockRootMismatch = errors.New("Sidecar block header root does not match signed block")
)

func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}

func newBlobBatchVerifier(newVerifier verification.NewBlobVerifier) *BlobBatchVerifier {
// NewBlobBatchVerifier initializes a blob batch verifier. It requires the caller to correctly specify
// verification Requirements and to also pass in a NewBlobVerifier, which is a callback function that
// returns a new BlobVerifier for handling a single blob in the batch.
func NewBlobBatchVerifier(newVerifier NewBlobVerifier, reqs []Requirement) *BlobBatchVerifier {
return &BlobBatchVerifier{
verifyKzg: kzg.Verify,
newVerifier: newVerifier,
reqs: reqs,
}
}

type kzgVerifier func(b ...blocks.ROBlob) error

// BlobBatchVerifier solves problems that come from verifying batches of blobs from RPC.
// First: we only update forkchoice after the entire batch has completed, so the n+1 elements in the batch
// won't be in forkchoice yet.
// Second: it is more efficient to batch some verifications, like kzg commitment verification. Batch adds a
// method to BlobVerifier to verify the kzg commitments of all blob sidecars for a block together, then using the cached
// result of the batch verification when verifying the individual blobs.
type BlobBatchVerifier struct {
verifyKzg kzgVerifier
newVerifier verification.NewBlobVerifier
verifyKzg roblobCommitmentVerifier
newVerifier NewBlobVerifier
reqs []Requirement
}

var _ das.BlobBatchVerifier = &BlobBatchVerifier{}

// VerifiedROBlobs satisfies the das.BlobBatchVerifier interface, used by das.AvailabilityStore.
func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.ROBlock, scs []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
if len(scs) == 0 {
return nil, nil
}
// We assume the proposer was validated wrt the block in batch block processing before performing the DA check.

// We assume the proposer is validated wrt the block in batch block processing before performing the DA check.
// So at this stage we just need to make sure the value being signed and signature bytes match the block.
for i := range scs {
if blk.Signature() != bytesutil.ToBytes96(scs[i].SignedBlockHeader.Signature) {
Expand All @@ -71,7 +64,7 @@ func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.
}
vs := make([]blocks.VerifiedROBlob, len(scs))
for i := range scs {
vb, err := batch.verifyOneBlob(ctx, scs[i])
vb, err := batch.verifyOneBlob(scs[i])
if err != nil {
return nil, err
}
Expand All @@ -80,13 +73,13 @@ func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.
return vs, nil
}

func (batch *BlobBatchVerifier) verifyOneBlob(ctx context.Context, sc blocks.ROBlob) (blocks.VerifiedROBlob, error) {
func (batch *BlobBatchVerifier) verifyOneBlob(sc blocks.ROBlob) (blocks.VerifiedROBlob, error) {
vb := blocks.VerifiedROBlob{}
bv := batch.newVerifier(sc, verification.InitsyncSidecarRequirements)
bv := batch.newVerifier(sc, batch.reqs)
// We can satisfy the following 2 requirements immediately because VerifiedROBlobs always verifies commitments
// and block signature for all blobs in the batch before calling verifyOneBlob.
bv.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
bv.SatisfyRequirement(verification.RequireValidProposerSignature)
bv.SatisfyRequirement(RequireSidecarKzgProofVerified)
bv.SatisfyRequirement(RequireValidProposerSignature)

if err := bv.BlobIndexInBounds(); err != nil {
return vb, err
Expand Down
189 changes: 189 additions & 0 deletions beacon-chain/verification/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package verification

import (
"context"
"testing"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/stretchr/testify/require"
)

func TestBatchVerifier(t *testing.T) {
ctx := context.Background()
mockCV := func(err error) roblobCommitmentVerifier {
return func(...blocks.ROBlob) error {
return err
}
}
var invCmtErr = errors.New("mock invalid commitment")
type vbcbt func() (blocks.VerifiedROBlob, error)
vbcb := func(bl blocks.ROBlob, err error) vbcbt {
return func() (blocks.VerifiedROBlob, error) {
return blocks.VerifiedROBlob{ROBlob: bl}, err
}
}
cases := []struct {
name string
nv func() NewBlobVerifier
cv roblobCommitmentVerifier
bandb func(t *testing.T, n int) (blocks.ROBlock, []blocks.ROBlob)
err error
nblobs int
reqs []Requirement
}{
{
name: "no blobs",
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
nblobs: 0,
},
{
name: "happy path",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 3,
},
{
name: "partial batch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
// Add extra blobs to the block that we won't return
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb+3)
return blk, blbs[0:3]
},
nblobs: 3,
},
{
name: "invalid commitment",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
cv: mockCV(invCmtErr),
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
err: invCmtErr,
nblobs: 1,
},
{
name: "signature mismatch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
blbs[0].SignedBlockHeader.Signature = []byte("wrong")
return blk, blbs
},
err: ErrBatchSignatureMismatch,
nblobs: 2,
},
{
name: "root mismatch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
wr, err := blocks.NewROBlobWithRoot(blbs[0].BlobSidecar, bytesutil.ToBytes32([]byte("wrong")))
require.NoError(t, err)
blbs[0] = wr
return blk, blbs
},
err: ErrBatchBlockRootMismatch,
nblobs: 1,
},
{
name: "idx oob",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{
ErrBlobIndexInBounds: ErrBlobIndexInvalid,
cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 1,
err: ErrBlobIndexInvalid,
},
{
name: "inclusion proof invalid",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{
ErrSidecarInclusionProven: ErrSidecarInclusionProofInvalid,
cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 1,
err: ErrSidecarInclusionProofInvalid,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
blk, blbs := c.bandb(t, c.nblobs)
reqs := c.reqs
if reqs == nil {
reqs = InitsyncSidecarRequirements
}
bbv := NewBlobBatchVerifier(c.nv(), reqs)
if c.cv == nil {
bbv.verifyKzg = mockCV(nil)
} else {
bbv.verifyKzg = c.cv
}
vb, err := bbv.VerifiedROBlobs(ctx, blk, blbs)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.Equal(t, c.nblobs, len(vb))
})
}
}
3 changes: 3 additions & 0 deletions beacon-chain/verification/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ var InitsyncSidecarRequirements = requirementList(GossipSidecarRequirements).exc
// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements.
var BackfillSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()

// PendingQueueSidecarRequirements is the same as InitsyncSidecarRequirements, used by the pending blocks queue.
var PendingQueueSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()

var (
ErrBlobInvalid = errors.New("blob failed verification")
// ErrBlobIndexInvalid means RequireBlobIndexInBounds failed.
Expand Down
Loading

0 comments on commit b3053dc

Please sign in to comment.