From 55a29a46708264348d627f4d9da69513db38aa37 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Mon, 5 Feb 2024 21:13:51 +0530 Subject: [PATCH] Implement beacon committee selections (#13503) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * implement beacon committee selections * fix build * fix lint * fix lint * Update beacon-chain/rpc/eth/shared/structs.go Co-authored-by: Radosław Kapka * Update validator/client/beacon-api/beacon_committee_selections.go Co-authored-by: Radosław Kapka * Update validator/client/beacon-api/beacon_committee_selections.go Co-authored-by: Radosław Kapka * Update validator/client/beacon-api/beacon_committee_selections.go Co-authored-by: Radosław Kapka * move beacon committee selection structs to validator module * fix bazel build files * add support for POST and GET endpoints for get state validators query * add a handler to return error from beacon node * move beacon committee selection to validator top-level module * fix bazel * re-arrange fields to fix lint * fix TestServer_InitializeRoutes * fix build and lint * fix build and lint * fix TestSubmitAggregateAndProof_Distributed --------- Co-authored-by: Radosław Kapka --- beacon-chain/rpc/eth/validator/handlers.go | 6 + beacon-chain/rpc/service.go | 1 + beacon-chain/rpc/service_test.go | 4 +- cmd/validator/flags/flags.go | 7 + cmd/validator/main.go | 1 + cmd/validator/usage.go | 1 + crypto/bls/common/mock/interface_mock.go | 138 ++++++------- .../validator-mock/validator_client_mock.go | 16 ++ validator/client/aggregate.go | 24 ++- validator/client/aggregate_test.go | 59 ++++++ validator/client/beacon-api/BUILD.bazel | 2 + .../client/beacon-api/activation_test.go | 8 + .../beacon-api/beacon_api_validator_client.go | 4 + .../beacon-api/beacon_committee_selections.go | 36 ++++ .../beacon_committee_selections_test.go | 124 ++++++++++++ validator/client/beacon-api/index_test.go | 23 +++ .../mock/beacon_block_converter_mock.go | 2 +- .../client/beacon-api/mock/duties_mock.go | 2 +- .../client/beacon-api/mock/genesis_mock.go | 12 +- .../beacon-api/mock/json_rest_handler_mock.go | 18 +- .../beacon-api/mock/state_validators_mock.go | 2 +- .../client/beacon-api/state_validators.go | 27 ++- .../beacon-api/state_validators_test.go | 191 +++++++++++++++++- .../submit_aggregate_selection_proof_test.go | 22 ++ .../client/beacon-api/sync_committee_test.go | 22 ++ .../client/grpc-api/grpc_validator_client.go | 4 + validator/client/iface/BUILD.bazel | 1 + validator/client/iface/validator_client.go | 55 +++++ validator/client/service.go | 5 + validator/client/validator.go | 123 ++++++++++- validator/client/validator_test.go | 92 ++++++++- validator/node/node.go | 1 + 32 files changed, 925 insertions(+), 108 deletions(-) create mode 100644 validator/client/beacon-api/beacon_committee_selections.go create mode 100644 validator/client/beacon-api/beacon_committee_selections_test.go diff --git a/beacon-chain/rpc/eth/validator/handlers.go b/beacon-chain/rpc/eth/validator/handlers.go index d6f069048dda..3f405aee8aec 100644 --- a/beacon-chain/rpc/eth/validator/handlers.go +++ b/beacon-chain/rpc/eth/validator/handlers.go @@ -1086,6 +1086,12 @@ func (s *Server) GetLiveness(w http.ResponseWriter, r *http.Request) { httputil.WriteJson(w, resp) } +// BeaconCommitteeSelections responds with appropriate message and status code according the spec: +// https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections. +func (s *Server) BeaconCommitteeSelections(w http.ResponseWriter, _ *http.Request) { + httputil.HandleError(w, "Endpoint not implemented", 501) +} + // attestationDependentRoot is get_block_root_at_slot(state, compute_start_slot_at_epoch(epoch - 1) - 1) // or the genesis block root in the case of underflow. func attestationDependentRoot(s state.BeaconState, epoch primitives.Epoch) ([]byte, error) { diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index f89d2b4302d8..752223f1ae7f 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -228,6 +228,7 @@ func (s *Service) initializeValidatorServerRoutes(validatorServer *validator.Ser s.cfg.Router.HandleFunc("/eth/v2/validator/blocks/{slot}", validatorServer.ProduceBlockV2).Methods(http.MethodGet) s.cfg.Router.HandleFunc("/eth/v1/validator/blinded_blocks/{slot}", validatorServer.ProduceBlindedBlock).Methods(http.MethodGet) s.cfg.Router.HandleFunc("/eth/v3/validator/blocks/{slot}", validatorServer.ProduceBlockV3).Methods(http.MethodGet) + s.cfg.Router.HandleFunc("/eth/v1/validator/beacon_committee_selections", validatorServer.BeaconCommitteeSelections).Methods(http.MethodPost) } func (s *Service) initializeNodeServerRoutes(nodeServer *node.Server) { diff --git a/beacon-chain/rpc/service_test.go b/beacon-chain/rpc/service_test.go index a24399663d1e..852acaf75342 100644 --- a/beacon-chain/rpc/service_test.go +++ b/beacon-chain/rpc/service_test.go @@ -152,8 +152,8 @@ func TestServer_InitializeRoutes(t *testing.T) { "/eth/v1/validator/aggregate_and_proofs": {http.MethodPost}, "/eth/v1/validator/beacon_committee_subscriptions": {http.MethodPost}, "/eth/v1/validator/sync_committee_subscriptions": {http.MethodPost}, - //"/eth/v1/validator/beacon_committee_selections": {http.MethodPost}, // not implemented - "/eth/v1/validator/sync_committee_contribution": {http.MethodGet}, + "/eth/v1/validator/beacon_committee_selections": {http.MethodPost}, + "/eth/v1/validator/sync_committee_contribution": {http.MethodGet}, //"/eth/v1/validator/sync_committee_selections": {http.MethodPost}, // not implemented "/eth/v1/validator/contribution_and_proofs": {http.MethodPost}, "/eth/v1/validator/prepare_beacon_proposer": {http.MethodPost}, diff --git a/cmd/validator/flags/flags.go b/cmd/validator/flags/flags.go index d9eb8d66cf1e..e24c06490383 100644 --- a/cmd/validator/flags/flags.go +++ b/cmd/validator/flags/flags.go @@ -377,6 +377,13 @@ var ( Usage: "Sets the maximum size for one batch of validator registrations. Use a non-positive value to disable batching.", Value: 0, } + + // EnableDistributed enables the usage of prysm validator client in a Distributed Validator Cluster. + EnableDistributed = &cli.BoolFlag{ + Name: "distributed", + Usage: "To enable the use of prysm validator client in Distributed Validator Cluster", + Value: false, + } ) // DefaultValidatorDir returns OS-specific default validator directory. diff --git a/cmd/validator/main.go b/cmd/validator/main.go index 150425dd117d..099ced70644a 100644 --- a/cmd/validator/main.go +++ b/cmd/validator/main.go @@ -74,6 +74,7 @@ var appFlags = []cli.Flag{ flags.WalletDirFlag, flags.EnableWebFlag, flags.GraffitiFileFlag, + flags.EnableDistributed, // Consensys' Web3Signer flags flags.Web3SignerURLFlag, flags.Web3SignerPublicValidatorKeysFlag, diff --git a/cmd/validator/usage.go b/cmd/validator/usage.go index 8369551336f4..0746f4b7d7d8 100644 --- a/cmd/validator/usage.go +++ b/cmd/validator/usage.go @@ -122,6 +122,7 @@ var appHelpFlagGroups = []flagGroup{ flags.EnableBuilderFlag, flags.BuilderGasLimitFlag, flags.ValidatorsRegistrationBatchSizeFlag, + flags.EnableDistributed, }, }, { diff --git a/crypto/bls/common/mock/interface_mock.go b/crypto/bls/common/mock/interface_mock.go index 3400c9d80dee..939ccf9b3784 100644 --- a/crypto/bls/common/mock/interface_mock.go +++ b/crypto/bls/common/mock/interface_mock.go @@ -12,30 +12,30 @@ import ( ) // MockSecretKey is a mock of SecretKey interface. -type SecretKey struct { +type MockSecretKey struct { ctrl *gomock.Controller - recorder *SecretKeyMockRecorder + recorder *MockSecretKeyMockRecorder } // MockSecretKeyMockRecorder is the mock recorder for MockSecretKey. -type SecretKeyMockRecorder struct { - mock *SecretKey +type MockSecretKeyMockRecorder struct { + mock *MockSecretKey } -// NewSecretKey creates a new mock instance. -func NewSecretKey(ctrl *gomock.Controller) *SecretKey { - mock := &SecretKey{ctrl: ctrl} - mock.recorder = &SecretKeyMockRecorder{mock} +// NewMockSecretKey creates a new mock instance. +func NewMockSecretKey(ctrl *gomock.Controller) *MockSecretKey { + mock := &MockSecretKey{ctrl: ctrl} + mock.recorder = &MockSecretKeyMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *SecretKey) EXPECT() *SecretKeyMockRecorder { +func (m *MockSecretKey) EXPECT() *MockSecretKeyMockRecorder { return m.recorder } // Marshal mocks base method. -func (m *SecretKey) Marshal() []byte { +func (m *MockSecretKey) Marshal() []byte { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Marshal") ret0, _ := ret[0].([]byte) @@ -43,13 +43,13 @@ func (m *SecretKey) Marshal() []byte { } // Marshal indicates an expected call of Marshal. -func (mr *SecretKeyMockRecorder) Marshal() *gomock.Call { +func (mr *MockSecretKeyMockRecorder) Marshal() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*SecretKey)(nil).Marshal)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*MockSecretKey)(nil).Marshal)) } // PublicKey mocks base method. -func (m *SecretKey) PublicKey() common.PublicKey { +func (m *MockSecretKey) PublicKey() common.PublicKey { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PublicKey") ret0, _ := ret[0].(common.PublicKey) @@ -57,13 +57,13 @@ func (m *SecretKey) PublicKey() common.PublicKey { } // PublicKey indicates an expected call of PublicKey. -func (mr *SecretKeyMockRecorder) PublicKey() *gomock.Call { +func (mr *MockSecretKeyMockRecorder) PublicKey() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublicKey", reflect.TypeOf((*SecretKey)(nil).PublicKey)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublicKey", reflect.TypeOf((*MockSecretKey)(nil).PublicKey)) } // Sign mocks base method. -func (m *SecretKey) Sign(msg []byte) common.Signature { +func (m *MockSecretKey) Sign(msg []byte) common.Signature { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Sign", msg) ret0, _ := ret[0].(common.Signature) @@ -71,36 +71,36 @@ func (m *SecretKey) Sign(msg []byte) common.Signature { } // Sign indicates an expected call of Sign. -func (mr *SecretKeyMockRecorder) Sign(msg interface{}) *gomock.Call { +func (mr *MockSecretKeyMockRecorder) Sign(msg interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sign", reflect.TypeOf((*SecretKey)(nil).Sign), msg) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sign", reflect.TypeOf((*MockSecretKey)(nil).Sign), msg) } // MockPublicKey is a mock of PublicKey interface. -type PublicKey struct { +type MockPublicKey struct { ctrl *gomock.Controller - recorder *PublicKeyMockRecorder + recorder *MockPublicKeyMockRecorder } // MockPublicKeyMockRecorder is the mock recorder for MockPublicKey. -type PublicKeyMockRecorder struct { - mock *PublicKey +type MockPublicKeyMockRecorder struct { + mock *MockPublicKey } -// NewPublicKey creates a new mock instance. -func NewPublicKey(ctrl *gomock.Controller) *PublicKey { - mock := &PublicKey{ctrl: ctrl} - mock.recorder = &PublicKeyMockRecorder{mock} +// NewMockPublicKey creates a new mock instance. +func NewMockPublicKey(ctrl *gomock.Controller) *MockPublicKey { + mock := &MockPublicKey{ctrl: ctrl} + mock.recorder = &MockPublicKeyMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *PublicKey) EXPECT() *PublicKeyMockRecorder { +func (m *MockPublicKey) EXPECT() *MockPublicKeyMockRecorder { return m.recorder } // Aggregate mocks base method. -func (m *PublicKey) Aggregate(p2 common.PublicKey) common.PublicKey { +func (m *MockPublicKey) Aggregate(p2 common.PublicKey) common.PublicKey { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Aggregate", p2) ret0, _ := ret[0].(common.PublicKey) @@ -108,13 +108,13 @@ func (m *PublicKey) Aggregate(p2 common.PublicKey) common.PublicKey { } // Aggregate indicates an expected call of Aggregate. -func (mr *PublicKeyMockRecorder) Aggregate(p2 interface{}) *gomock.Call { +func (mr *MockPublicKeyMockRecorder) Aggregate(p2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregate", reflect.TypeOf((*PublicKey)(nil).Aggregate), p2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregate", reflect.TypeOf((*MockPublicKey)(nil).Aggregate), p2) } // Copy mocks base method. -func (m *PublicKey) Copy() common.PublicKey { +func (m *MockPublicKey) Copy() common.PublicKey { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Copy") ret0, _ := ret[0].(common.PublicKey) @@ -122,13 +122,13 @@ func (m *PublicKey) Copy() common.PublicKey { } // Copy indicates an expected call of Copy. -func (mr *PublicKeyMockRecorder) Copy() *gomock.Call { +func (mr *MockPublicKeyMockRecorder) Copy() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Copy", reflect.TypeOf((*PublicKey)(nil).Copy)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Copy", reflect.TypeOf((*MockPublicKey)(nil).Copy)) } // Equals mocks base method. -func (m *PublicKey) Equals(p2 common.PublicKey) bool { +func (m *MockPublicKey) Equals(p2 common.PublicKey) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Equals", p2) ret0, _ := ret[0].(bool) @@ -136,13 +136,13 @@ func (m *PublicKey) Equals(p2 common.PublicKey) bool { } // Equals indicates an expected call of Equals. -func (mr *PublicKeyMockRecorder) Equals(p2 interface{}) *gomock.Call { +func (mr *MockPublicKeyMockRecorder) Equals(p2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Equals", reflect.TypeOf((*PublicKey)(nil).Equals), p2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Equals", reflect.TypeOf((*MockPublicKey)(nil).Equals), p2) } // IsInfinite mocks base method. -func (m *PublicKey) IsInfinite() bool { +func (m *MockPublicKey) IsInfinite() bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsInfinite") ret0, _ := ret[0].(bool) @@ -150,13 +150,13 @@ func (m *PublicKey) IsInfinite() bool { } // IsInfinite indicates an expected call of IsInfinite. -func (mr *PublicKeyMockRecorder) IsInfinite() *gomock.Call { +func (mr *MockPublicKeyMockRecorder) IsInfinite() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsInfinite", reflect.TypeOf((*PublicKey)(nil).IsInfinite)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsInfinite", reflect.TypeOf((*MockPublicKey)(nil).IsInfinite)) } // Marshal mocks base method. -func (m *PublicKey) Marshal() []byte { +func (m *MockPublicKey) Marshal() []byte { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Marshal") ret0, _ := ret[0].([]byte) @@ -164,36 +164,36 @@ func (m *PublicKey) Marshal() []byte { } // Marshal indicates an expected call of Marshal. -func (mr *PublicKeyMockRecorder) Marshal() *gomock.Call { +func (mr *MockPublicKeyMockRecorder) Marshal() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*PublicKey)(nil).Marshal)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*MockPublicKey)(nil).Marshal)) } // MockSignature is a mock of Signature interface. -type Signature struct { +type MockSignature struct { ctrl *gomock.Controller - recorder *SignatureMockRecorder + recorder *MockSignatureMockRecorder } // MockSignatureMockRecorder is the mock recorder for MockSignature. -type SignatureMockRecorder struct { - mock *Signature +type MockSignatureMockRecorder struct { + mock *MockSignature } -// NewSignature creates a new mock instance. -func NewSignature(ctrl *gomock.Controller) *Signature { - mock := &Signature{ctrl: ctrl} - mock.recorder = &SignatureMockRecorder{mock} +// NewMockSignature creates a new mock instance. +func NewMockSignature(ctrl *gomock.Controller) *MockSignature { + mock := &MockSignature{ctrl: ctrl} + mock.recorder = &MockSignatureMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *Signature) EXPECT() *SignatureMockRecorder { +func (m *MockSignature) EXPECT() *MockSignatureMockRecorder { return m.recorder } // AggregateVerify mocks base method. -func (m *Signature) AggregateVerify(pubKeys []common.PublicKey, msgs [][32]byte) bool { +func (m *MockSignature) AggregateVerify(pubKeys []common.PublicKey, msgs [][32]byte) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AggregateVerify", pubKeys, msgs) ret0, _ := ret[0].(bool) @@ -201,13 +201,13 @@ func (m *Signature) AggregateVerify(pubKeys []common.PublicKey, msgs [][32]byte) } // AggregateVerify indicates an expected call of AggregateVerify. -func (mr *SignatureMockRecorder) AggregateVerify(pubKeys, msgs interface{}) *gomock.Call { +func (mr *MockSignatureMockRecorder) AggregateVerify(pubKeys, msgs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateVerify", reflect.TypeOf((*Signature)(nil).AggregateVerify), pubKeys, msgs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateVerify", reflect.TypeOf((*MockSignature)(nil).AggregateVerify), pubKeys, msgs) } // Copy mocks base method. -func (m *Signature) Copy() common.Signature { +func (m *MockSignature) Copy() common.Signature { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Copy") ret0, _ := ret[0].(common.Signature) @@ -215,13 +215,13 @@ func (m *Signature) Copy() common.Signature { } // Copy indicates an expected call of Copy. -func (mr *SignatureMockRecorder) Copy() *gomock.Call { +func (mr *MockSignatureMockRecorder) Copy() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Copy", reflect.TypeOf((*Signature)(nil).Copy)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Copy", reflect.TypeOf((*MockSignature)(nil).Copy)) } // Eth2FastAggregateVerify mocks base method. -func (m *Signature) Eth2FastAggregateVerify(pubKeys []common.PublicKey, msg [32]byte) bool { +func (m *MockSignature) Eth2FastAggregateVerify(pubKeys []common.PublicKey, msg [32]byte) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Eth2FastAggregateVerify", pubKeys, msg) ret0, _ := ret[0].(bool) @@ -229,13 +229,13 @@ func (m *Signature) Eth2FastAggregateVerify(pubKeys []common.PublicKey, msg [32] } // Eth2FastAggregateVerify indicates an expected call of Eth2FastAggregateVerify. -func (mr *SignatureMockRecorder) Eth2FastAggregateVerify(pubKeys, msg interface{}) *gomock.Call { +func (mr *MockSignatureMockRecorder) Eth2FastAggregateVerify(pubKeys, msg interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Eth2FastAggregateVerify", reflect.TypeOf((*Signature)(nil).Eth2FastAggregateVerify), pubKeys, msg) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Eth2FastAggregateVerify", reflect.TypeOf((*MockSignature)(nil).Eth2FastAggregateVerify), pubKeys, msg) } // FastAggregateVerify mocks base method. -func (m *Signature) FastAggregateVerify(pubKeys []common.PublicKey, msg [32]byte) bool { +func (m *MockSignature) FastAggregateVerify(pubKeys []common.PublicKey, msg [32]byte) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FastAggregateVerify", pubKeys, msg) ret0, _ := ret[0].(bool) @@ -243,13 +243,13 @@ func (m *Signature) FastAggregateVerify(pubKeys []common.PublicKey, msg [32]byte } // FastAggregateVerify indicates an expected call of FastAggregateVerify. -func (mr *SignatureMockRecorder) FastAggregateVerify(pubKeys, msg interface{}) *gomock.Call { +func (mr *MockSignatureMockRecorder) FastAggregateVerify(pubKeys, msg interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FastAggregateVerify", reflect.TypeOf((*Signature)(nil).FastAggregateVerify), pubKeys, msg) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FastAggregateVerify", reflect.TypeOf((*MockSignature)(nil).FastAggregateVerify), pubKeys, msg) } // Marshal mocks base method. -func (m *Signature) Marshal() []byte { +func (m *MockSignature) Marshal() []byte { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Marshal") ret0, _ := ret[0].([]byte) @@ -257,13 +257,13 @@ func (m *Signature) Marshal() []byte { } // Marshal indicates an expected call of Marshal. -func (mr *SignatureMockRecorder) Marshal() *gomock.Call { +func (mr *MockSignatureMockRecorder) Marshal() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*Signature)(nil).Marshal)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Marshal", reflect.TypeOf((*MockSignature)(nil).Marshal)) } // Verify mocks base method. -func (m *Signature) Verify(pubKey common.PublicKey, msg []byte) bool { +func (m *MockSignature) Verify(pubKey common.PublicKey, msg []byte) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Verify", pubKey, msg) ret0, _ := ret[0].(bool) @@ -271,7 +271,7 @@ func (m *Signature) Verify(pubKey common.PublicKey, msg []byte) bool { } // Verify indicates an expected call of Verify. -func (mr *SignatureMockRecorder) Verify(pubKey, msg interface{}) *gomock.Call { +func (mr *MockSignatureMockRecorder) Verify(pubKey, msg interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Verify", reflect.TypeOf((*Signature)(nil).Verify), pubKey, msg) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Verify", reflect.TypeOf((*MockSignature)(nil).Verify), pubKey, msg) } diff --git a/testing/validator-mock/validator_client_mock.go b/testing/validator-mock/validator_client_mock.go index d71baea2b21b..3bbe5c9dc60d 100644 --- a/testing/validator-mock/validator_client_mock.go +++ b/testing/validator-mock/validator_client_mock.go @@ -11,6 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + iface "github.com/prysmaticlabs/prysm/v4/validator/client/iface" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -81,6 +82,21 @@ func (mr *MockValidatorClientMockRecorder) EventStreamIsRunning() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventStreamIsRunning", reflect.TypeOf((*MockValidatorClient)(nil).EventStreamIsRunning)) } +// GetAggregatedSelections mocks base method. +func (m *MockValidatorClient) GetAggregatedSelections(arg0 context.Context, arg1 []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAggregatedSelections", arg0, arg1) + ret0, _ := ret[0].([]iface.BeaconCommitteeSelection) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAggregatedSelections indicates an expected call of GetAggregatedSelections. +func (mr *MockValidatorClientMockRecorder) GetAggregatedSelections(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAggregatedSelections", reflect.TypeOf((*MockValidatorClient)(nil).GetAggregatedSelections), arg0, arg1) +} + // GetAttestationData mocks base method. func (m *MockValidatorClient) GetAttestationData(arg0 context.Context, arg1 *eth.AttestationDataRequest) (*eth.AttestationData, error) { m.ctrl.T.Helper() diff --git a/validator/client/aggregate.go b/validator/client/aggregate.go index 37cc1d2aeb5d..ef112ee0341d 100644 --- a/validator/client/aggregate.go +++ b/validator/client/aggregate.go @@ -53,13 +53,25 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives v.aggregatedSlotCommitteeIDCache.Add(k, true) v.aggregatedSlotCommitteeIDCacheLock.Unlock() - slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot) - if err != nil { - log.WithError(err).Error("Could not sign slot") - if v.emitAccountMetrics { - ValidatorAggFailVec.WithLabelValues(fmtKey).Inc() + var slotSig []byte + if v.distributed { + slotSig, err = v.getAttSelection(attSelectionKey{slot: slot, index: duty.ValidatorIndex}) + if err != nil { + log.WithError(err).Error("Could not find aggregated selection proof") + if v.emitAccountMetrics { + ValidatorAggFailVec.WithLabelValues(fmtKey).Inc() + } + return + } + } else { + slotSig, err = v.signSlotWithSelectionProof(ctx, pubKey, slot) + if err != nil { + log.WithError(err).Error("Could not sign slot") + if v.emitAccountMetrics { + ValidatorAggFailVec.WithLabelValues(fmtKey).Inc() + } + return } - return } // As specified in spec, an aggregator should wait until two thirds of the way through slot diff --git a/validator/client/aggregate_test.go b/validator/client/aggregate_test.go index 93f0032f10c0..a9839846c5bb 100644 --- a/validator/client/aggregate_test.go +++ b/validator/client/aggregate_test.go @@ -5,6 +5,8 @@ import ( "errors" "testing" + "github.com/prysmaticlabs/prysm/v4/validator/client/iface" + "github.com/golang/mock/gomock" "github.com/prysmaticlabs/go-bitfield" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" @@ -116,6 +118,63 @@ func TestSubmitAggregateAndProof_Ok(t *testing.T) { validator.SubmitAggregateAndProof(context.Background(), 0, pubKey) } +func TestSubmitAggregateAndProof_Distributed(t *testing.T) { + validatorIdx := primitives.ValidatorIndex(123) + slot := primitives.Slot(456) + ctx := context.Background() + + validator, m, validatorKey, finish := setup(t) + defer finish() + + var pubKey [fieldparams.BLSPubkeyLength]byte + copy(pubKey[:], validatorKey.PublicKey().Marshal()) + validator.duties = ðpb.DutiesResponse{ + CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + { + PublicKey: validatorKey.PublicKey().Marshal(), + ValidatorIndex: validatorIdx, + AttesterSlot: slot, + }, + }, + } + + validator.distributed = true + validator.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection) + validator.attSelections[attSelectionKey{ + slot: slot, + index: 123, + }] = iface.BeaconCommitteeSelection{ + SelectionProof: make([]byte, 96), + Slot: slot, + ValidatorIndex: validatorIdx, + } + + m.validatorClient.EXPECT().SubmitAggregateSelectionProof( + gomock.Any(), // ctx + gomock.AssignableToTypeOf(ðpb.AggregateSelectionRequest{}), + ).Return(ðpb.AggregateSelectionResponse{ + AggregateAndProof: ðpb.AggregateAttestationAndProof{ + AggregatorIndex: 0, + Aggregate: util.HydrateAttestation(ðpb.Attestation{ + AggregationBits: make([]byte, 1), + }), + SelectionProof: make([]byte, 96), + }, + }, nil) + + m.validatorClient.EXPECT().DomainData( + gomock.Any(), // ctx + gomock.Any(), // epoch + ).Return(ðpb.DomainResponse{SignatureDomain: make([]byte, 32)}, nil /*err*/) + + m.validatorClient.EXPECT().SubmitSignedAggregateSelectionProof( + gomock.Any(), // ctx + gomock.AssignableToTypeOf(ðpb.SignedAggregateSubmitRequest{}), + ).Return(ðpb.SignedAggregateSubmitResponse{AttestationDataRoot: make([]byte, 32)}, nil) + + validator.SubmitAggregateAndProof(ctx, slot, pubKey) +} + func TestWaitForSlotTwoThird_WaitCorrectly(t *testing.T) { validator, _, _, finish := setup(t) defer finish() diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index 34e4234d476f..b9dc52531681 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "beacon_block_converter.go", "beacon_block_json_helpers.go", "beacon_block_proto_helpers.go", + "beacon_committee_selections.go", "domain_data.go", "doppelganger.go", "duties.go", @@ -77,6 +78,7 @@ go_test( "beacon_block_converter_test.go", "beacon_block_json_helpers_test.go", "beacon_block_proto_helpers_test.go", + "beacon_committee_selections_test.go", "domain_data_test.go", "doppelganger_test.go", "duties_test.go", diff --git a/validator/client/beacon-api/activation_test.go b/validator/client/beacon-api/activation_test.go index 8aea9ded87ec..319d4c8d9a7a 100644 --- a/validator/client/beacon-api/activation_test.go +++ b/validator/client/beacon-api/activation_test.go @@ -290,6 +290,14 @@ func TestActivation_JsonResponseError(t *testing.T) { errors.New("some specific json error"), ).Times(1) + jsonRestHandler.EXPECT().Get( + ctx, + gomock.Any(), + gomock.Any(), + ).Return( + errors.New("some specific json error"), + ).Times(1) + validatorClient := beaconApiValidatorClient{ stateValidatorsProvider: beaconApiStateValidatorsProvider{ jsonRestHandler: jsonRestHandler, diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index df7fc6a06333..db03e4184f64 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -175,3 +175,7 @@ func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error { func (c *beaconApiValidatorClient) EventStreamIsRunning() bool { return c.eventHandler.running } + +func (c *beaconApiValidatorClient) GetAggregatedSelections(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) { + return c.getAggregatedSelection(ctx, selections) +} diff --git a/validator/client/beacon-api/beacon_committee_selections.go b/validator/client/beacon-api/beacon_committee_selections.go new file mode 100644 index 000000000000..c6c74883606e --- /dev/null +++ b/validator/client/beacon-api/beacon_committee_selections.go @@ -0,0 +1,36 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/prysmaticlabs/prysm/v4/validator/client/iface" + + "github.com/pkg/errors" +) + +type aggregatedSelectionResponse struct { + Data []iface.BeaconCommitteeSelection `json:"data"` +} + +func (c *beaconApiValidatorClient) getAggregatedSelection(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) { + body, err := json.Marshal(selections) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal selections") + } + + var resp aggregatedSelectionResponse + err = c.jsonRestHandler.Post(ctx, "/eth/v1/validator/beacon_committee_selections", nil, bytes.NewBuffer(body), &resp) + if err != nil { + return nil, errors.Wrap(err, "error calling post endpoint") + } + if len(resp.Data) == 0 { + return nil, errors.New("no aggregated selection returned") + } + if len(selections) != len(resp.Data) { + return nil, errors.New("mismatching number of selections") + } + + return resp.Data, nil +} diff --git a/validator/client/beacon-api/beacon_committee_selections_test.go b/validator/client/beacon-api/beacon_committee_selections_test.go new file mode 100644 index 000000000000..344fece74598 --- /dev/null +++ b/validator/client/beacon-api/beacon_committee_selections_test.go @@ -0,0 +1,124 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + "testing" + + "github.com/prysmaticlabs/prysm/v4/validator/client/iface" + + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/mock" + test_helpers "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/test-helpers" +) + +func TestGetAggregatedSelections(t *testing.T) { + testcases := []struct { + name string + req []iface.BeaconCommitteeSelection + res []iface.BeaconCommitteeSelection + endpointError error + expectedErrorMessage string + }{ + { + name: "valid", + req: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 82), + Slot: 75, + ValidatorIndex: 76, + }, + }, + res: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 100), + Slot: 75, + ValidatorIndex: 76, + }, + }, + }, + { + name: "endpoint error", + req: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 82), + Slot: 75, + ValidatorIndex: 76, + }, + }, + endpointError: errors.New("bad request"), + expectedErrorMessage: "bad request", + }, + { + name: "no response error", + req: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 82), + Slot: 75, + ValidatorIndex: 76, + }, + }, + expectedErrorMessage: "no aggregated selection returned", + }, + { + name: "mismatch response", + req: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 82), + Slot: 75, + ValidatorIndex: 76, + }, + { + SelectionProof: test_helpers.FillByteSlice(96, 102), + Slot: 75, + ValidatorIndex: 79, + }, + }, + res: []iface.BeaconCommitteeSelection{ + { + SelectionProof: test_helpers.FillByteSlice(96, 100), + Slot: 75, + ValidatorIndex: 76, + }, + }, + expectedErrorMessage: "mismatching number of selections", + }, + } + + for _, test := range testcases { + t.Run(test.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + jsonRestHandler := mock.NewMockJsonRestHandler(ctrl) + + reqBody, err := json.Marshal(test.req) + require.NoError(t, err) + + ctx := context.Background() + jsonRestHandler.EXPECT().Post( + ctx, + "/eth/v1/validator/beacon_committee_selections", + nil, + bytes.NewBuffer(reqBody), + &aggregatedSelectionResponse{}, + ).SetArg( + 4, + aggregatedSelectionResponse{Data: test.res}, + ).Return( + test.endpointError, + ).Times(1) + + validatorClient := &beaconApiValidatorClient{jsonRestHandler: jsonRestHandler} + res, err := validatorClient.GetAggregatedSelections(ctx, test.req) + if test.expectedErrorMessage != "" { + require.ErrorContains(t, test.expectedErrorMessage, err) + return + } + + require.NoError(t, err) + require.DeepEqual(t, test.res, res) + }) + } +} diff --git a/validator/client/beacon-api/index_test.go b/validator/client/beacon-api/index_test.go index 23c0459906b3..91102c64038e 100644 --- a/validator/client/beacon-api/index_test.go +++ b/validator/client/beacon-api/index_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "net/url" "testing" "github.com/ethereum/go-ethereum/common/hexutil" @@ -26,6 +27,7 @@ func getPubKeyAndReqBuffer(t *testing.T) ([]byte, *bytes.Buffer) { Ids: []string{stringPubKey}, Statuses: []string{}, } + reqBytes, err := json.Marshal(req) require.NoError(t, err) return pubKey, bytes.NewBuffer(reqBytes) @@ -192,6 +194,27 @@ func TestIndex_JsonResponseError(t *testing.T) { errors.New("some specific json error"), ).Times(1) + req := structs.GetValidatorsRequest{ + Ids: []string{stringPubKey}, + Statuses: []string{}, + } + + queryParams := url.Values{} + for _, id := range req.Ids { + queryParams.Add("id", id) + } + for _, st := range req.Statuses { + queryParams.Add("status", st) + } + + jsonRestHandler.EXPECT().Get( + ctx, + buildURL("/eth/v1/beacon/states/head/validators", queryParams), + &stateValidatorsResponseJson, + ).Return( + errors.New("some specific json error"), + ).Times(1) + validatorClient := beaconApiValidatorClient{ stateValidatorsProvider: beaconApiStateValidatorsProvider{ jsonRestHandler: jsonRestHandler, diff --git a/validator/client/beacon-api/mock/beacon_block_converter_mock.go b/validator/client/beacon-api/mock/beacon_block_converter_mock.go index b75196b274a9..dd636f82c790 100644 --- a/validator/client/beacon-api/mock/beacon_block_converter_mock.go +++ b/validator/client/beacon-api/mock/beacon_block_converter_mock.go @@ -8,7 +8,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - "github.com/prysmaticlabs/prysm/v4/api/server/structs" + structs "github.com/prysmaticlabs/prysm/v4/api/server/structs" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ) diff --git a/validator/client/beacon-api/mock/duties_mock.go b/validator/client/beacon-api/mock/duties_mock.go index 303d632a1680..1c0f46626306 100644 --- a/validator/client/beacon-api/mock/duties_mock.go +++ b/validator/client/beacon-api/mock/duties_mock.go @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - "github.com/prysmaticlabs/prysm/v4/api/server/structs" + structs "github.com/prysmaticlabs/prysm/v4/api/server/structs" primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ) diff --git a/validator/client/beacon-api/mock/genesis_mock.go b/validator/client/beacon-api/mock/genesis_mock.go index a0e0aad1d2e0..8f38f63e6fa7 100644 --- a/validator/client/beacon-api/mock/genesis_mock.go +++ b/validator/client/beacon-api/mock/genesis_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api (interfaces: GenesisProvider) +// Source: validator/client/beacon-api/genesis.go // Package mock is a generated GoMock package. package mock @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - "github.com/prysmaticlabs/prysm/v4/api/server/structs" + structs "github.com/prysmaticlabs/prysm/v4/api/server/structs" ) // MockGenesisProvider is a mock of GenesisProvider interface. @@ -36,16 +36,16 @@ func (m *MockGenesisProvider) EXPECT() *MockGenesisProviderMockRecorder { } // GetGenesis mocks base method. -func (m *MockGenesisProvider) GetGenesis(arg0 context.Context) (*structs.Genesis, error) { +func (m *MockGenesisProvider) GetGenesis(ctx context.Context) (*structs.Genesis, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetGenesis", arg0) + ret := m.ctrl.Call(m, "GetGenesis", ctx) ret0, _ := ret[0].(*structs.Genesis) ret1, _ := ret[1].(error) return ret0, ret1 } // GetGenesis indicates an expected call of GetGenesis. -func (mr *MockGenesisProviderMockRecorder) GetGenesis(arg0 interface{}) *gomock.Call { +func (mr *MockGenesisProviderMockRecorder) GetGenesis(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGenesis", reflect.TypeOf((*MockGenesisProvider)(nil).GetGenesis), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGenesis", reflect.TypeOf((*MockGenesisProvider)(nil).GetGenesis), ctx) } diff --git a/validator/client/beacon-api/mock/json_rest_handler_mock.go b/validator/client/beacon-api/mock/json_rest_handler_mock.go index ae95ecd74ffd..b939c1221b1c 100644 --- a/validator/client/beacon-api/mock/json_rest_handler_mock.go +++ b/validator/client/beacon-api/mock/json_rest_handler_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api (interfaces: JsonRestHandler) +// Source: validator/client/beacon-api/json_rest_handler.go // Package mock is a generated GoMock package. package mock @@ -36,29 +36,29 @@ func (m *MockJsonRestHandler) EXPECT() *MockJsonRestHandlerMockRecorder { } // Get mocks base method. -func (m *MockJsonRestHandler) Get(arg0 context.Context, arg1 string, arg2 interface{}) error { +func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Get", ctx, endpoint, resp) ret0, _ := ret[0].(error) return ret0 } // Get indicates an expected call of Get. -func (mr *MockJsonRestHandlerMockRecorder) Get(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockJsonRestHandlerMockRecorder) Get(ctx, endpoint, resp interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), ctx, endpoint, resp) } // Post mocks base method. -func (m *MockJsonRestHandler) Post(arg0 context.Context, arg1 string, arg2 map[string]string, arg3 *bytes.Buffer, arg4 interface{}) error { +func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp interface{}) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Post", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "Post", ctx, endpoint, headers, data, resp) ret0, _ := ret[0].(error) return ret0 } // Post indicates an expected call of Post. -func (mr *MockJsonRestHandlerMockRecorder) Post(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp) } diff --git a/validator/client/beacon-api/mock/state_validators_mock.go b/validator/client/beacon-api/mock/state_validators_mock.go index 39fd360c9dff..2a36fc66b7fd 100644 --- a/validator/client/beacon-api/mock/state_validators_mock.go +++ b/validator/client/beacon-api/mock/state_validators_mock.go @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - "github.com/prysmaticlabs/prysm/v4/api/server/structs" + structs "github.com/prysmaticlabs/prysm/v4/api/server/structs" primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ) diff --git a/validator/client/beacon-api/state_validators.go b/validator/client/beacon-api/state_validators.go index 9a19ad267bee..e87406e2ec81 100644 --- a/validator/client/beacon-api/state_validators.go +++ b/validator/client/beacon-api/state_validators.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/url" "strconv" "github.com/pkg/errors" @@ -91,7 +92,31 @@ func (c beaconApiStateValidatorsProvider) getStateValidatorsHelper( return nil, errors.Wrapf(err, "failed to marshal request into JSON") } stateValidatorsJson := &structs.GetValidatorsResponse{} - if err = c.jsonRestHandler.Post(ctx, endpoint, nil, bytes.NewBuffer(reqBytes), stateValidatorsJson); err != nil { + // First try POST endpoint to check whether it is supported by the beacon node. + if err = c.jsonRestHandler.Post(ctx, endpoint, nil, bytes.NewBuffer(reqBytes), stateValidatorsJson); err == nil { + if stateValidatorsJson.Data == nil { + return nil, errors.New("stateValidatorsJson.Data is nil") + } + + return stateValidatorsJson, nil + } + + // Re-initialise the response just in case. + stateValidatorsJson = &structs.GetValidatorsResponse{} + + // Seems like POST isn't supported by the beacon node, let's try the GET one. + queryParams := url.Values{} + for _, id := range req.Ids { + queryParams.Add("id", id) + } + for _, st := range req.Statuses { + queryParams.Add("status", st) + } + + query := buildURL(endpoint, queryParams) + + err = c.jsonRestHandler.Get(ctx, query, stateValidatorsJson) + if err != nil { return nil, err } diff --git a/validator/client/beacon-api/state_validators_test.go b/validator/client/beacon-api/state_validators_test.go index faacd4196545..6a8521aa35ac 100644 --- a/validator/client/beacon-api/state_validators_test.go +++ b/validator/client/beacon-api/state_validators_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "net/url" "testing" "github.com/golang/mock/gomock" @@ -15,7 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/mock" ) -func TestGetStateValidators_Nominal(t *testing.T) { +func TestGetStateValidators_Nominal_POST(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -101,6 +102,112 @@ func TestGetStateValidators_Nominal(t *testing.T) { assert.DeepEqual(t, wanted, actual.Data) } +func TestGetStateValidators_Nominal_GET(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + req := &structs.GetValidatorsRequest{ + Ids: []string{ + "12345", + "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", + "0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526", + "0x424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242", + "0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5", + }, + Statuses: []string{"active_ongoing", "active_exiting", "exited_slashed", "exited_unslashed"}, + } + reqBytes, err := json.Marshal(req) + require.NoError(t, err) + + stateValidatorsResponseJson := structs.GetValidatorsResponse{} + jsonRestHandler := mock.NewMockJsonRestHandler(ctrl) + + wanted := []*structs.ValidatorContainer{ + { + Index: "12345", + Status: "active_ongoing", + Validator: &structs.Validator{ + Pubkey: "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be19", + }, + }, + { + Index: "55293", + Status: "active_ongoing", + Validator: &structs.Validator{ + Pubkey: "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", + }, + }, + { + Index: "55294", + Status: "active_exiting", + Validator: &structs.Validator{ + Pubkey: "0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526", + }, + }, + { + Index: "55295", + Status: "exited_slashed", + Validator: &structs.Validator{ + Pubkey: "0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5", + }, + }, + } + + ctx := context.Background() + + // First return an error from POST call. + jsonRestHandler.EXPECT().Post( + ctx, + "/eth/v1/beacon/states/head/validators", + nil, + bytes.NewBuffer(reqBytes), + &stateValidatorsResponseJson, + ).Return( + errors.New("an error"), + ).Times(1) + + // Then try the GET call which will be successful. + queryParams := url.Values{} + for _, id := range req.Ids { + queryParams.Add("id", id) + } + for _, st := range req.Statuses { + queryParams.Add("status", st) + } + + query := buildURL("/eth/v1/beacon/states/head/validators", queryParams) + + jsonRestHandler.EXPECT().Get( + ctx, + query, + &stateValidatorsResponseJson, + ).Return( + nil, + ).SetArg( + 2, + structs.GetValidatorsResponse{ + Data: wanted, + }, + ).Times(1) + + stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler} + actual, err := stateValidatorsProvider.GetStateValidators(ctx, []string{ + "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing + "0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526", // active_exiting + "0x424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242", // does not exist + "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing - duplicate + "0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5", // exited_slashed + }, + []primitives.ValidatorIndex{ + 12345, // active_ongoing + 12345, // active_ongoing - duplicate + }, + []string{"active_ongoing", "active_exiting", "exited_slashed", "exited_unslashed"}, + ) + require.NoError(t, err) + assert.DeepEqual(t, wanted, actual.Data) +} + func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -117,6 +224,7 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) { ctx := context.Background() + // First call POST. jsonRestHandler.EXPECT().Post( ctx, "/eth/v1/beacon/states/head/validators", @@ -127,6 +235,25 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) { errors.New("an error"), ).Times(1) + // Call to GET endpoint upon receiving error from POST call. + queryParams := url.Values{} + for _, id := range req.Ids { + queryParams.Add("id", id) + } + for _, st := range req.Statuses { + queryParams.Add("status", st) + } + + query := buildURL("/eth/v1/beacon/states/head/validators", queryParams) + + jsonRestHandler.EXPECT().Get( + ctx, + query, + &stateValidatorsResponseJson, + ).Return( + errors.New("an error"), + ).Times(1) + stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler} _, err = stateValidatorsProvider.GetStateValidators(ctx, []string{ "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing @@ -137,7 +264,7 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) { assert.ErrorContains(t, "an error", err) } -func TestGetStateValidators_DataIsNil(t *testing.T) { +func TestGetStateValidators_DataIsNil_POST(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -175,3 +302,63 @@ func TestGetStateValidators_DataIsNil(t *testing.T) { ) assert.ErrorContains(t, "stateValidatorsJson.Data is nil", err) } + +func TestGetStateValidators_DataIsNil_GET(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + req := &structs.GetValidatorsRequest{ + Ids: []string{"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13"}, + Statuses: []string{}, + } + reqBytes, err := json.Marshal(req) + require.NoError(t, err) + + ctx := context.Background() + stateValidatorsResponseJson := structs.GetValidatorsResponse{} + jsonRestHandler := mock.NewMockJsonRestHandler(ctrl) + + // First call POST which will return an error. + jsonRestHandler.EXPECT().Post( + ctx, + "/eth/v1/beacon/states/head/validators", + nil, + bytes.NewBuffer(reqBytes), + &stateValidatorsResponseJson, + ).Return( + errors.New("an error"), + ).Times(1) + + // Then call GET which returns nil Data. + queryParams := url.Values{} + for _, id := range req.Ids { + queryParams.Add("id", id) + } + for _, st := range req.Statuses { + queryParams.Add("status", st) + } + + query := buildURL("/eth/v1/beacon/states/head/validators", queryParams) + + jsonRestHandler.EXPECT().Get( + ctx, + query, + &stateValidatorsResponseJson, + ).Return( + nil, + ).SetArg( + 2, + structs.GetValidatorsResponse{ + Data: nil, + }, + ).Times(1) + + stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler} + _, err = stateValidatorsProvider.GetStateValidators(ctx, []string{ + "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing + }, + nil, + nil, + ) + assert.ErrorContains(t, "stateValidatorsJson.Data is nil", err) +} diff --git a/validator/client/beacon-api/submit_aggregate_selection_proof_test.go b/validator/client/beacon-api/submit_aggregate_selection_proof_test.go index c4d712e32096..4f3eb7e7c6a8 100644 --- a/validator/client/beacon-api/submit_aggregate_selection_proof_test.go +++ b/validator/client/beacon-api/submit_aggregate_selection_proof_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "testing" "github.com/ethereum/go-ethereum/common/hexutil" @@ -201,6 +202,27 @@ func TestSubmitAggregateSelectionProof(t *testing.T) { test.validatorsErr, ).Times(test.validatorsCalled) + if test.validatorsErr != nil { + // Then try the GET call which will also return error. + queryParams := url.Values{} + for _, id := range valsReq.Ids { + queryParams.Add("id", id) + } + for _, st := range valsReq.Statuses { + queryParams.Add("status", st) + } + + query := buildURL("/eth/v1/beacon/states/head/validators", queryParams) + + jsonRestHandler.EXPECT().Get( + ctx, + query, + &structs.GetValidatorsResponse{}, + ).Return( + test.validatorsErr, + ).Times(1) + } + // Call attester duties endpoint to get attester duties. validatorIndicesBytes, err := json.Marshal([]string{validatorIndex}) require.NoError(t, err) diff --git a/validator/client/beacon-api/sync_committee_test.go b/validator/client/beacon-api/sync_committee_test.go index f602ee940576..6c10c4c18d9b 100644 --- a/validator/client/beacon-api/sync_committee_test.go +++ b/validator/client/beacon-api/sync_committee_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/url" "testing" "github.com/ethereum/go-ethereum/common/hexutil" @@ -337,6 +338,27 @@ func TestGetSyncSubCommitteeIndex(t *testing.T) { test.validatorsErr, ).Times(1) + if test.validatorsErr != nil { + // Then try the GET call which will also return error. + queryParams := url.Values{} + for _, id := range valsReq.Ids { + queryParams.Add("id", id) + } + for _, st := range valsReq.Statuses { + queryParams.Add("status", st) + } + + query := buildURL("/eth/v1/beacon/states/head/validators", queryParams) + + jsonRestHandler.EXPECT().Get( + ctx, + query, + &structs.GetValidatorsResponse{}, + ).Return( + test.validatorsErr, + ).Times(1) + } + validatorIndicesBytes, err := json.Marshal([]string{validatorIndex}) require.NoError(t, err) diff --git a/validator/client/grpc-api/grpc_validator_client.go b/validator/client/grpc-api/grpc_validator_client.go index c97e1a621709..4036396a1f85 100644 --- a/validator/client/grpc-api/grpc_validator_client.go +++ b/validator/client/grpc-api/grpc_validator_client.go @@ -138,6 +138,10 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits( return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in) } +func (grpcValidatorClient) GetAggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) { + return nil, iface.ErrNotSupported +} + func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient { return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)} } diff --git a/validator/client/iface/BUILD.bazel b/validator/client/iface/BUILD.bazel index 4acfedda99f1..8d6cc9805beb 100644 --- a/validator/client/iface/BUILD.bazel +++ b/validator/client/iface/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/validator-client:go_default_library", "//validator/keymanager:go_default_library", + "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_golang_protobuf//ptypes/empty", "@com_github_pkg_errors//:go_default_library", ], diff --git a/validator/client/iface/validator_client.go b/validator/client/iface/validator_client.go index bb04fb3f72bf..473afe5301f6 100644 --- a/validator/client/iface/validator_client.go +++ b/validator/client/iface/validator_client.go @@ -2,12 +2,66 @@ package iface import ( "context" + "encoding/json" + "strconv" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" "github.com/golang/protobuf/ptypes/empty" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ) +type BeaconCommitteeSelection struct { + SelectionProof []byte + Slot primitives.Slot + ValidatorIndex primitives.ValidatorIndex +} + +type beaconCommitteeSelectionJson struct { + SelectionProof string `json:"selection_proof"` + Slot string `json:"slot"` + ValidatorIndex string `json:"validator_index"` +} + +func (b BeaconCommitteeSelection) MarshalJSON() ([]byte, error) { + return json.Marshal(beaconCommitteeSelectionJson{ + SelectionProof: hexutil.Encode(b.SelectionProof), + Slot: strconv.FormatUint(uint64(b.Slot), 10), + ValidatorIndex: strconv.FormatUint(uint64(b.ValidatorIndex), 10), + }) +} + +func (b *BeaconCommitteeSelection) UnmarshalJSON(input []byte) error { + var bjson beaconCommitteeSelectionJson + err := json.Unmarshal(input, &bjson) + if err != nil { + return errors.Wrap(err, "failed to unmarshal beacon committee selection") + } + + slot, err := strconv.ParseUint(bjson.Slot, 10, 64) + if err != nil { + return errors.Wrap(err, "failed to parse slot") + } + + vIdx, err := strconv.ParseUint(bjson.ValidatorIndex, 10, 64) + if err != nil { + return errors.Wrap(err, "failed to parse validator index") + } + + selectionProof, err := hexutil.Decode(bjson.SelectionProof) + if err != nil { + return errors.Wrap(err, "failed to parse selection proof") + } + + b.Slot = primitives.Slot(slot) + b.SelectionProof = selectionProof + b.ValidatorIndex = primitives.ValidatorIndex(vIdx) + + return nil +} + type ValidatorClient interface { GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) @@ -36,4 +90,5 @@ type ValidatorClient interface { SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) StartEventStream(ctx context.Context) error EventStreamIsRunning() bool + GetAggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error) } diff --git a/validator/client/service.go b/validator/client/service.go index d10d3a80c2b1..c3124f047536 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -56,6 +56,7 @@ type ValidatorService struct { useWeb bool emitAccountMetrics bool logValidatorBalances bool + distributed bool interopKeysConfig *local.InteropKeymanagerConfig conn validatorHelpers.NodeConnection grpcRetryDelay time.Duration @@ -83,6 +84,7 @@ type Config struct { UseWeb bool LogValidatorBalances bool EmitAccountMetrics bool + Distributed bool InteropKeysConfig *local.InteropKeymanagerConfig Wallet *wallet.Wallet WalletInitializedFeed *event.Feed @@ -131,6 +133,7 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e Web3SignerConfig: cfg.Web3SignerConfig, proposerSettings: cfg.ProposerSettings, validatorsRegBatchSize: cfg.ValidatorsRegBatchSize, + distributed: cfg.Distributed, } dialOpts := ConstructDialOptions( @@ -230,6 +233,8 @@ func (v *ValidatorService) Start() { proposerSettings: v.proposerSettings, walletInitializedChannel: make(chan *wallet.Wallet, 1), validatorsRegBatchSize: v.validatorsRegBatchSize, + distributed: v.distributed, + attSelections: make(map[attSelectionKey]iface.BeaconCommitteeSelection), } v.validator = valStruct diff --git a/validator/client/validator.go b/validator/client/validator.go index 629fb9bc056c..92e1b81b0bfb 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -67,12 +67,14 @@ type validator struct { logValidatorBalances bool useWeb bool emitAccountMetrics bool + distributed bool domainDataLock sync.RWMutex attLogsLock sync.Mutex aggregatedSlotCommitteeIDCacheLock sync.Mutex highestValidSlotLock sync.Mutex prevBalanceLock sync.RWMutex slashableKeysLock sync.RWMutex + attSelectionLock sync.Mutex eipImportBlacklistedPublicKeys map[[fieldparams.BLSPubkeyLength]byte]bool walletInitializedFeed *event.Feed attLogs map[[32]byte]*attSubmitted @@ -82,6 +84,7 @@ type validator struct { prevBalance map[[fieldparams.BLSPubkeyLength]byte]uint64 pubkeyToValidatorIndex map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex signedValidatorRegistrations map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1 + attSelections map[attSelectionKey]iface.BeaconCommitteeSelection graffitiOrderedIndex uint64 aggregatedSlotCommitteeIDCache *lru.Cache domainDataCache *ristretto.Cache @@ -113,6 +116,11 @@ type validatorStatus struct { index primitives.ValidatorIndex } +type attSelectionKey struct { + slot primitives.Slot + index primitives.ValidatorIndex +} + // Done cleans up the validator. func (v *validator) Done() { v.ticker.Done() @@ -629,6 +637,13 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes subscribeValidatorIndices := make([]primitives.ValidatorIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) alreadySubscribed := make(map[[64]byte]bool) + if v.distributed { + // Get aggregated selection proofs to calculate isAggregator. + if err := v.getAggregatedSelectionProofs(ctx, res); err != nil { + return errors.Wrap(err, "could not get aggregated selection proofs") + } + } + for _, duty := range res.CurrentEpochDuties { pk := bytesutil.ToBytes48(duty.PublicKey) if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING { @@ -641,7 +656,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes continue } - aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, pk) + aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, pk, validatorIndex) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } @@ -667,7 +682,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes continue } - aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey), validatorIndex) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } @@ -718,7 +733,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) - aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey)) + aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) if err != nil { return nil, errors.Wrap(err, "could not check if a validator is an aggregator") } @@ -773,15 +788,26 @@ func (v *validator) Keymanager() (keymanager.IKeymanager, error) { // isAggregator checks if a validator is an aggregator of a given slot and committee, // it uses a modulo calculated by validator count in committee and samples randomness around it. -func (v *validator) isAggregator(ctx context.Context, committee []primitives.ValidatorIndex, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) (bool, error) { +func (v *validator) isAggregator(ctx context.Context, committee []primitives.ValidatorIndex, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) { modulo := uint64(1) if len(committee)/int(params.BeaconConfig().TargetAggregatorsPerCommittee) > 1 { modulo = uint64(len(committee)) / params.BeaconConfig().TargetAggregatorsPerCommittee } - slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot) - if err != nil { - return false, err + var ( + slotSig []byte + err error + ) + if v.distributed { + slotSig, err = v.getAttSelection(attSelectionKey{slot: slot, index: validatorIndex}) + if err != nil { + return false, err + } + } else { + slotSig, err = v.signSlotWithSelectionProof(ctx, pubKey, slot) + if err != nil { + return false, err + } } b := hash.Hash(slotSig) @@ -1230,6 +1256,89 @@ func (v *validator) validatorIndex(ctx context.Context, pubkey [fieldparams.BLSP return resp.Index, true, nil } +func (v *validator) getAggregatedSelectionProofs(ctx context.Context, duties *ethpb.DutiesResponse) error { + // Create new instance of attestation selections map. + v.newAttSelections() + + var req []iface.BeaconCommitteeSelection + for _, duty := range duties.CurrentEpochDuties { + if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING { + continue + } + + pk := bytesutil.ToBytes48(duty.PublicKey) + slotSig, err := v.signSlotWithSelectionProof(ctx, pk, duty.AttesterSlot) + if err != nil { + return err + } + + req = append(req, iface.BeaconCommitteeSelection{ + SelectionProof: slotSig, + Slot: duty.AttesterSlot, + ValidatorIndex: duty.ValidatorIndex, + }) + } + + for _, duty := range duties.NextEpochDuties { + if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING { + continue + } + + pk := bytesutil.ToBytes48(duty.PublicKey) + slotSig, err := v.signSlotWithSelectionProof(ctx, pk, duty.AttesterSlot) + if err != nil { + return err + } + + req = append(req, iface.BeaconCommitteeSelection{ + SelectionProof: slotSig, + Slot: duty.AttesterSlot, + ValidatorIndex: duty.ValidatorIndex, + }) + } + + resp, err := v.validatorClient.GetAggregatedSelections(ctx, req) + if err != nil { + return err + } + + // Store aggregated selection proofs in state. + v.addAttSelections(resp) + + return nil +} + +func (v *validator) addAttSelections(selections []iface.BeaconCommitteeSelection) { + v.attSelectionLock.Lock() + defer v.attSelectionLock.Unlock() + + for _, s := range selections { + v.attSelections[attSelectionKey{ + slot: s.Slot, + index: s.ValidatorIndex, + }] = s + } +} + +func (v *validator) newAttSelections() { + v.attSelectionLock.Lock() + defer v.attSelectionLock.Unlock() + + v.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection) +} + +func (v *validator) getAttSelection(key attSelectionKey) ([]byte, error) { + v.attSelectionLock.Lock() + defer v.attSelectionLock.Unlock() + + s, ok := v.attSelections[key] + if !ok { + return nil, errors.Errorf("selection proof not found for the given slot=%d and validator_index=%d", key.slot, key.index) + } + + return s.SelectionProof, nil +} + // This constructs a validator subscribed key, it's used to track // which subnet has already been pending requested. func validatorSubscribeKey(slot primitives.Slot, committeeID primitives.CommitteeIndex) [64]byte { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index d041a3d87b15..7b45d7c26b4c 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -639,6 +639,92 @@ func TestUpdateDuties_AllValidatorsExited(t *testing.T) { } +func TestUpdateDuties_Distributed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := validatormock.NewMockValidatorClient(ctrl) + + // Start of third epoch. + slot := 2 * params.BeaconConfig().SlotsPerEpoch + keys := randKeypair(t) + resp := ðpb.DutiesResponse{ + CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + { + AttesterSlot: slot, // First slot in epoch. + ValidatorIndex: 200, + CommitteeIndex: 100, + PublicKey: keys.pub[:], + Status: ethpb.ValidatorStatus_ACTIVE, + }, + }, + NextEpochDuties: []*ethpb.DutiesResponse_Duty{ + { + AttesterSlot: slot + params.BeaconConfig().SlotsPerEpoch, // First slot in next epoch. + ValidatorIndex: 200, + CommitteeIndex: 100, + PublicKey: keys.pub[:], + Status: ethpb.ValidatorStatus_ACTIVE, + }, + }, + } + + v := validator{ + keyManager: newMockKeymanager(t, keys), + validatorClient: client, + distributed: true, + } + + sigDomain := make([]byte, 32) + + client.EXPECT().GetDuties( + gomock.Any(), + gomock.Any(), + ).Return(resp, nil) + + client.EXPECT().DomainData( + gomock.Any(), // ctx + gomock.Any(), // epoch + ).Return( + ðpb.DomainResponse{SignatureDomain: sigDomain}, + nil, /*err*/ + ).Times(2) + + client.EXPECT().GetAggregatedSelections( + gomock.Any(), + gomock.Any(), // fill this properly + ).Return( + []iface.BeaconCommitteeSelection{ + { + SelectionProof: make([]byte, 32), + Slot: slot, + ValidatorIndex: 200, + }, + { + SelectionProof: make([]byte, 32), + Slot: slot + params.BeaconConfig().SlotsPerEpoch, + ValidatorIndex: 200, + }, + }, + nil, + ) + + var wg sync.WaitGroup + wg.Add(1) + + client.EXPECT().SubscribeCommitteeSubnets( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []primitives.ValidatorIndex) (*emptypb.Empty, error) { + wg.Done() + return nil, nil + }) + + require.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments") + util.WaitTimeout(&wg, 2*time.Second) + require.Equal(t, 2, len(v.attSelections)) +} + func TestRolesAt_OK(t *testing.T) { v, m, validatorKey, finish := setup(t) defer finish() @@ -2173,7 +2259,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigDisabled(t *testing.T) { ctx := context.Background() client := validatormock.NewMockValidatorClient(ctrl) - signature := blsmock.NewSignature(ctrl) + signature := blsmock.NewMockSignature(ctrl) signature.EXPECT().Marshal().Return([]byte{}) v := validator{ @@ -2258,7 +2344,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigEnabled(t *testing.T) { ctx := context.Background() client := validatormock.NewMockValidatorClient(ctrl) - signature := blsmock.NewSignature(ctrl) + signature := blsmock.NewMockSignature(ctrl) signature.EXPECT().Marshal().Return([]byte{}).Times(2) v := validator{ @@ -2380,7 +2466,7 @@ func TestValidator_buildSignedRegReqs_TimestampBeforeGenesis(t *testing.T) { ctx := context.Background() client := validatormock.NewMockValidatorClient(ctrl) - signature := blsmock.NewSignature(ctrl) + signature := blsmock.NewMockSignature(ctrl) v := validator{ signedValidatorRegistrations: map[[48]byte]*ethpb.SignedValidatorRegistrationV1{}, diff --git a/validator/node/node.go b/validator/node/node.go index dc33784abf3f..b343c34189bf 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -497,6 +497,7 @@ func (c *ValidatorClient) registerValidatorService(cliCtx *cli.Context) error { BeaconApiTimeout: time.Second * 30, BeaconApiEndpoint: c.cliCtx.String(flags.BeaconRESTApiProviderFlag.Name), ValidatorsRegBatchSize: c.cliCtx.Int(flags.ValidatorsRegistrationBatchSizeFlag.Name), + Distributed: c.cliCtx.Bool(flags.EnableDistributed.Name), }) if err != nil { return errors.Wrap(err, "could not initialize validator service")