Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework validator initializations #14779

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,17 @@ type Validator struct {
proposerSettings *proposer.Settings
}

func (_ *Validator) Init(ctx context.Context) error {
panic("implement me")
}

func (_ *Validator) LogSubmittedSyncCommitteeMessages() {}

func (_ *Validator) Done() {
panic("implement me")
}

func (_ *Validator) WaitForChainStart(_ context.Context) error {
func (_ *Validator) WaitForChainStart(_ context.Context) (primitives.Slot, error) {
panic("implement me")
}

Expand Down
3 changes: 2 additions & 1 deletion validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const (
// Validator interface defines the primary methods of a validator client.
type Validator interface {
Done()
WaitForChainStart(ctx context.Context) error
Init(ctx context.Context) error
WaitForChainStart(ctx context.Context) (primitives.Slot, error)
WaitForSync(ctx context.Context) error
WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error
CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error)
Expand Down
118 changes: 6 additions & 112 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var backOffPeriod = 10 * time.Second
// canceled.
//
// Order of operations:
// 1 - Initialize validator data
// 1 - Init validator data
// 2 - Wait for validator activation
// 3 - Wait for the next slot start
// 4 - Update assignments
Expand All @@ -39,13 +39,10 @@ func run(ctx context.Context, v iface.Validator) {
cleanup := v.Done
defer cleanup()

headSlot, err := initializeValidatorAndGetHeadSlot(ctx, v)
if err != nil {
if err := v.Init(ctx); err != nil {
return // Exit if context is canceled.
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}

eventsChan := make(chan *event.Event, 1)
healthTracker := v.HealthTracker()
runHealthCheckRoutine(ctx, v, eventsChan)
Expand All @@ -56,15 +53,7 @@ func run(ctx context.Context, v iface.Validator) {
log.WithError(err).Fatal("Could not get keymanager")
}
sub := km.SubscribeAccountChanges(accountsChangedChan)
// check if proposer settings is still nil
// Set properties on the beacon node like the fee recipient for validators that are being used & active.
if v.ProposerSettings() == nil {
log.Warn("Validator client started without proposer settings such as fee recipient" +
" and will continue to use settings provided in the beacon node.")
}
if err := v.PushProposerSettings(ctx, km, headSlot, true); err != nil {
log.WithError(err).Fatal("Failed to update proposer settings")
}

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -120,13 +109,8 @@ func run(ctx context.Context, v iface.Validator) {
performRoles(slotCtx, allRoles, v, slot, &wg, span)
case isHealthyAgain := <-healthTracker.HealthUpdates():
if isHealthyAgain {
headSlot, err = initializeValidatorAndGetHeadSlot(ctx, v)
if err != nil {
log.WithError(err).Error("Failed to re initialize validator and get head slot")
continue
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
if err = v.Init(ctx); err != nil {
log.WithError(err).Error("Failed to re initialize validator")
continue
}
}
Expand Down Expand Up @@ -155,82 +139,6 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt
}
}

func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (primitives.Slot, error) {
ctx, span := prysmTrace.StartSpan(ctx, "validator.initializeValidatorAndGetHeadSlot")
defer span.End()

ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()

firstTime := true

var (
headSlot primitives.Slot
err error
)

for {
if !firstTime {
if ctx.Err() != nil {
log.Info("Context canceled, stopping validator")
return headSlot, errors.New("context canceled")
}
<-ticker.C
}

firstTime = false

if err := v.WaitForChainStart(ctx); err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not determine if beacon chain started")
continue
}

log.WithError(err).Fatal("Could not determine if beacon chain started")
}

if err := v.WaitForKeymanagerInitialization(ctx); err != nil {
// log.Fatal will prevent defer from being called
v.Done()
log.WithError(err).Fatal("Wallet is not ready")
}

if err := v.WaitForSync(ctx); err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not determine if beacon chain started")
continue
}

log.WithError(err).Fatal("Could not determine if beacon node synced")
}

if err := v.WaitForActivation(ctx, nil /* accountsChangedChan */); err != nil {
log.WithError(err).Fatal("Could not wait for validator activation")
}

headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.WithError(err).Warn("Could not get current canonical head slot")
continue
}

if err != nil {
log.WithError(err).Fatal("Could not get current canonical head slot")
}

if err := v.CheckDoppelGanger(ctx); err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not wait for checking doppelganger")
continue
}

log.WithError(err).Fatal("Could not succeed with doppelganger check")
}
break
}
return headSlot, nil
}

func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.ValidatorRole, v iface.Validator, slot primitives.Slot, wg *sync.WaitGroup, span trace.Span) {
for pubKey, roles := range allRoles {
wg.Add(len(roles))
Expand Down Expand Up @@ -310,20 +218,6 @@ func runHealthCheckRoutine(ctx context.Context, v iface.Validator, eventsChan ch
if !tracker.CheckHealth(ctx) {
continue // Skip to the next ticker
}

km, err := v.Keymanager()
if err != nil {
log.WithError(err).Error("Could not get keymanager")
return
}
slot, err := v.CanonicalHeadSlot(ctx)
if err != nil {
log.WithError(err).Error("Could not get canonical head slot")
return
}
if err := v.PushProposerSettings(ctx, km, slot, true); err != nil {
log.WithError(err).Warn("Failed to update proposer settings")
}
}

// in case of node returning healthy but event stream died
Expand Down
10 changes: 7 additions & 3 deletions validator/client/testutil/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (fv *FakeValidator) Done() {
fv.DoneCalled = true
}

func (fv *FakeValidator) Init(ctx context.Context) error {
return nil
}

// WaitForKeymanagerInitialization for mocking.
func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) error {
fv.WaitForWalletInitializationCalled = true
Expand All @@ -80,12 +84,12 @@ func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) erro
func (fv *FakeValidator) LogSubmittedSyncCommitteeMessages() {}

// WaitForChainStart for mocking.
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
func (fv *FakeValidator) WaitForChainStart(_ context.Context) (primitives.Slot, error) {
fv.WaitForChainStartCalled++
if fv.RetryTillSuccess >= fv.WaitForChainStartCalled {
return api.ErrConnectionIssue
return 0, api.ErrConnectionIssue
}
return nil
return 0, nil
}

// WaitForActivation for mocking.
Expand Down
103 changes: 93 additions & 10 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,88 @@ func (v *validator) Done() {
v.ticker.Done()
}

func (v *validator) Init(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "validator.Init")
defer span.End()

ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()

firstTime := true
var (
currentSlot primitives.Slot
err error
)
for {
if !firstTime {
if ctx.Err() != nil {
log.Info("Context canceled, stopping validator")
return errors.New("context canceled")
}
<-ticker.C
}

firstTime = false
currentSlot, err = v.WaitForChainStart(ctx)
if err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not determine if beacon chain started")
continue
}

log.WithError(err).Fatal("Could not determine if beacon chain started")
}

if err := v.WaitForKeymanagerInitialization(ctx); err != nil {
// log.Fatal will prevent defer from being called
v.Done()
log.WithError(err).Fatal("Wallet is not ready")
}

if err := v.WaitForSync(ctx); err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not determine if beacon chain started")
continue
}

log.WithError(err).Fatal("Could not determine if beacon node synced")
}

if err := v.WaitForActivation(ctx, nil /* accountsChangedChan */); err != nil {
log.WithError(err).Fatal("Could not wait for validator activation")
}

if err := v.CheckDoppelGanger(ctx); err != nil {
if isConnectionError(err) {
log.WithError(err).Warn("Could not wait for checking doppelganger")
continue
}

log.WithError(err).Fatal("Could not succeed with doppelganger check")
}
break
}
// should there be a check if it's too later into current slot?
if err := v.UpdateDuties(ctx, currentSlot); err != nil {
handleAssignmentError(err, currentSlot)
}

km, err := v.Keymanager()
if err != nil {
log.WithError(err).Fatal("Could not get keymanager")
}
// check if proposer settings is still nil
// Set properties on the beacon node like the fee recipient for validators that are being used & active.
if v.ProposerSettings() == nil {
log.Warn("Validator client started without proposer settings such as fee recipient" +
" and will continue to use settings provided in the beacon node.")
}
if err := v.PushProposerSettings(ctx, km, currentSlot, true); err != nil {
log.WithError(err).Fatal("Failed to update proposer settings")
}
return nil
}

// WaitForKeymanagerInitialization checks if the validator needs to wait for keymanager initialization.
func (v *validator) WaitForKeymanagerInitialization(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForKeymanagerInitialization")
Expand Down Expand Up @@ -253,43 +335,44 @@ func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km keym
// it calls to the beacon node which then verifies the ETH1.0 deposit contract logs to check
// for the ChainStart log to have been emitted. If so, it starts a ticker based on the ChainStart
// unix timestamp which will be used to keep track of time within the validator client.
func (v *validator) WaitForChainStart(ctx context.Context) error {
func (v *validator) WaitForChainStart(ctx context.Context) (primitives.Slot, error) {
ctx, span := trace.StartSpan(ctx, "validator.WaitForChainStart")
defer span.End()

var currentSlot primitives.Slot
// First, check if the beacon chain has started.
log.Info("Syncing with beacon node to align on chain genesis info")

chainStartRes, err := v.validatorClient.WaitForChainStart(ctx, &emptypb.Empty{})
if errors.Is(err, io.EOF) {
return client.ErrConnectionIssue
return currentSlot, client.ErrConnectionIssue
}

if errors.Is(ctx.Err(), context.Canceled) {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
return currentSlot, errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}

if err != nil {
return errors.Wrap(
return currentSlot, errors.Wrap(
client.ErrConnectionIssue,
errors.Wrap(err, "could not receive ChainStart from stream").Error(),
)
}

v.genesisTime = chainStartRes.GenesisTime
currentSlot = slots.CurrentSlot(chainStartRes.GenesisTime)

curGenValRoot, err := v.db.GenesisValidatorsRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get current genesis validators root")
return currentSlot, errors.Wrap(err, "could not get current genesis validators root")
}

if len(curGenValRoot) == 0 {
if err := v.db.SaveGenesisValidatorsRoot(ctx, chainStartRes.GenesisValidatorsRoot); err != nil {
return errors.Wrap(err, "could not save genesis validators root")
return currentSlot, errors.Wrap(err, "could not save genesis validators root")
}

v.setTicker()
return nil
return currentSlot, nil
}

if !bytes.Equal(curGenValRoot, chainStartRes.GenesisValidatorsRoot) {
Expand All @@ -299,15 +382,15 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
clear the database. If not, please file an issue at https://github.com/prysmaticlabs/prysm/issues`,
cmd.ClearDB.Name,
)
return fmt.Errorf(
return currentSlot, fmt.Errorf(
"genesis validators root from beacon node (%#x) does not match root saved in validator db (%#x)",
chainStartRes.GenesisValidatorsRoot,
curGenValRoot,
)
}

v.setTicker()
return nil
return currentSlot, nil
}

func (v *validator) setTicker() {
Expand Down
Loading