Skip to content

Commit

Permalink
Add /eth/v2/validator/aggregate_and_proofs (#14490)
Browse files Browse the repository at this point in the history
* fix endpoint

* changelog + gaz

* add to endpoints test

* Radek' review

* James' review

* fix duplication a bit

* fix changelog
  • Loading branch information
saolyn authored Oct 18, 2024
1 parent ffc443b commit 4aa5410
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Added GetBlockAttestationsV2 endpoint.
- Light client support: Consensus types for Electra
- Added SubmitPoolAttesterSlashingV2 endpoint.
- Added SubmitAggregateAndProofsRequestV2 endpoint.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion api/server/structs/endpoints_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type SubmitContributionAndProofsRequest struct {
}

type SubmitAggregateAndProofsRequest struct {
Data []*SignedAggregateAttestationAndProof `json:"data"`
Data []json.RawMessage `json:"data"`
}

type SubmitSyncCommitteeSubscriptionsRequest struct {
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/rpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ func (s *Service) validatorEndpoints(
handler: server.SubmitAggregateAndProofs,
methods: []string{http.MethodPost},
},
{
template: "/eth/v2/validator/aggregate_and_proofs",
name: namespace + ".SubmitAggregateAndProofsV2",
middleware: []middleware.Middleware{
middleware.ContentTypeHandler([]string{api.JsonMediaType}),
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
},
handler: server.SubmitAggregateAndProofsV2,
methods: []string{http.MethodPost},
},
{
template: "/eth/v1/validator/sync_committee_contribution",
name: namespace + ".ProduceSyncCommitteeContribution",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Test_endpoints(t *testing.T) {
"/eth/v1/validator/attestation_data": {http.MethodGet},
"/eth/v1/validator/aggregate_attestation": {http.MethodGet},
"/eth/v1/validator/aggregate_and_proofs": {http.MethodPost},
"/eth/v2/validator/aggregate_and_proofs": {http.MethodPost},
"/eth/v1/validator/beacon_committee_subscriptions": {http.MethodPost},
"/eth/v1/validator/sync_committee_subscriptions": {http.MethodPost},
"/eth/v1/validator/beacon_committee_selections": {http.MethodPost},
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/eth/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go_test(
"//encoding/bytesutil:go_default_library",
"//network/httputil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/mock:go_default_library",
"//testing/require:go_default_library",
Expand Down
117 changes: 102 additions & 15 deletions beacon-chain/rpc/eth/validator/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/network/httputil"
ethpbalpha "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -118,30 +120,34 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
ctx, span := trace.StartSpan(r.Context(), "validator.SubmitContributionAndProofs")
defer span.End()

var req structs.SubmitContributionAndProofsRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {
case errors.Is(err, io.EOF):
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
return
case err != nil:
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
var reqData []json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil {
if errors.Is(err, io.EOF) {
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
} else {
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
}
return
}
if len(req.Data) == 0 {
if len(reqData) == 0 {
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
return
}

for _, item := range req.Data {
consensusItem, err := item.ToConsensus()
for _, item := range reqData {
var contribution structs.SignedContributionAndProof
if err := json.Unmarshal(item, &contribution); err != nil {
httputil.HandleError(w, "Could not decode item: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := contribution.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request contribution to consensus contribution: "+err.Error(), http.StatusBadRequest)
httputil.HandleError(w, "Could not convert contribution to consensus format: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem)
if rpcError != nil {
if rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem); rpcError != nil {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
}
Expand All @@ -168,7 +174,13 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request

broadcastFailed := false
for _, item := range req.Data {
consensusItem, err := item.ToConsensus()
var signedAggregate structs.SignedAggregateAttestationAndProof
err := json.Unmarshal(item, &signedAggregate)
if err != nil {
httputil.HandleError(w, "Could not decode item: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
Expand All @@ -191,6 +203,81 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
}
}

// SubmitAggregateAndProofsV2 verifies given aggregate and proofs and publishes them on appropriate gossipsub topic.
func (s *Server) SubmitAggregateAndProofsV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.SubmitAggregateAndProofsV2")
defer span.End()

var reqData []json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil {
if errors.Is(err, io.EOF) {
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
} else {
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
}
return
}
if len(reqData) == 0 {
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
return
}

versionHeader := r.Header.Get(api.VersionHeader)
if versionHeader == "" {
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
}
v, err := version.FromString(versionHeader)
if err != nil {
httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest)
return
}

broadcastFailed := false
var rpcError *core.RpcError
for _, raw := range reqData {
if v >= version.Electra {
var signedAggregate structs.SignedAggregateAttestationAndProofElectra
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
} else {
var signedAggregate structs.SignedAggregateAttestationAndProof
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
}

if rpcError != nil {
var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError
if errors.As(rpcError.Err, &aggregateBroadcastFailedError) {
broadcastFailed = true
} else {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
}
if broadcastFailed {
httputil.HandleError(w, "Could not broadcast one or more signed aggregated attestations", http.StatusInternalServerError)
}
}

// SubmitSyncCommitteeSubscription subscribe to a number of sync committee subnets.
//
// Subscribing to sync committee subnets is an action performed by VC to enable
Expand Down
Loading

0 comments on commit 4aa5410

Please sign in to comment.