-
Notifications
You must be signed in to change notification settings - Fork 988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesign the runner #14096
base: develop
Are you sure you want to change the base?
Redesign the runner #14096
Conversation
9f030c5
to
ca08db6
Compare
ca08db6
to
0da32f4
Compare
validator/client/runner.go
Outdated
@@ -125,18 +126,39 @@ func run(ctx context.Context, v iface.Validator) { | |||
continue | |||
} | |||
performRoles(slotCtx, allRoles, v, slot, &wg, span) | |||
case isHealthyAgain := <-healthTracker.HealthUpdates(): | |||
if isHealthyAgain { | |||
case <-v.LastSecondOfSlot(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm kinda worried about contents of this delaying the normal processes of the next slot above. if it happens at the last second of the slot wouldn't that mean we only have 1 second to do the below? otherwise this loop would be delayed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the newest changes there is next to no work here
func (n *NodeHealthTracker) HealthUpdates() <-chan bool { | ||
return n.healthChan | ||
log.Infof("Starting health check routine. Health check will be performed every %d seconds", params.BeaconConfig().SecondsPerSlot) | ||
ticker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice this seems like an improvement
@@ -269,6 +274,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error { | |||
} | |||
|
|||
v.genesisTime = chainStartRes.GenesisTime | |||
log.WithField("genesisTime", time.Unix(int64(v.genesisTime), 0)).Info("Beacon chain started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from setTicker
v.dutiesLock.Unlock() | ||
|
||
v.logDuties(slot, resp.CurrentEpochDuties, resp.NextEpochDuties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to hold the lock
validator/client/runner.go
Outdated
v.ChangeHost() | ||
initializationNeeded = true | ||
updateDutiesNeeded = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question that might come up is "why do we update duties but don't push proposer settings here?"
The answer is because they would have been pushed immediately, without waiting for the beacon node to become healthy, whereas update duties will not be called before the node comes back up.
validator/client/runner.go
Outdated
} | ||
}() | ||
pushProposerSettingsChan <- headSlot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this trigger make sense? you can get the canonical head slot from within that go channel too right? you can call v.CanonicalHeadSlot
case <-ctx.Done(): | ||
return | ||
case slot := <-pushProposerSettingsChan: | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid of spawning too many go channels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean too many goroutines? They are super lightweight, it's fine to have thousands of them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right too many goroutines 🤔
validator/client/runner.go
Outdated
} else { | ||
log.WithError(err).Fatal("Failed to update proposer settings") // allow fatal. skipcq | ||
|
||
pushProposerSettingsChan := make(chan primitives.Slot, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you start the validator on the first slot of the epoch would it block or error on this ( should call this channel twice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block, but only for a split second because each receive from the channel spawns a goroutine
slotSpan *trace.Span | ||
slotCtx context.Context | ||
slotCancel context.CancelFunc | ||
initializationNeeded = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should just have some kind of validator client status, feels hacky to use a variable like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it might be a good idea, but would probably result in a much different design, further complicating the already non-trivial PR.
@@ -1,112 +0,0 @@ | |||
package beacon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting all tests for the health tracker? Shouldn't there be some tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons:
- there is barely any logic
- the update takes place every 12 seconds, so the test would be super slow (we could make the interval a parameter, but it would be only for testing, therefore I am not a big fan of this)
@@ -2,11 +2,8 @@ load("@prysm//tools/go:def.bzl", "go_library") | |||
|
|||
go_library( | |||
name = "go_default_library", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name = "go_default_library", | |
name = "go_default_library", | |
testonly = True, |
Please add test only attribute so we are sure this doesn't link into a production binary
c.eventStreamLock.Lock() | ||
defer c.eventStreamLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this called from multiple go routines? If so, we may want to use RLock here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, starting should be done from only one place.
_, hasDeadline := ctx.Deadline() | ||
if !hasDeadline { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, c.timeout) | ||
defer cancel() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only set a timeout, if one isn't set already? I would expect that c.timeout sets the timeout everytime.
context.Context will cancel if the parents context has a shorter deadline anyway, so it should be safe to always set the timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is to not set the timeout to c.timeout
when the context has a deadline longer than c.timeout
. Otherwise the request will finish too quickly. Currently we use a custom Client
with a pre-set timeout. Even though we want to wait up to 5 minutes when pushing proposer settings, the client's timeout disallows this.
_, hasDeadline := ctx.Deadline() | ||
if !hasDeadline { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, c.timeout) | ||
defer cancel() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here, I think you should always set the timeout.
log.WithError(err).Fatal("Failed to update proposer settings") // allow fatal. skipcq | ||
|
||
pushProposerSettingsChan := make(chan struct{}, 1) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if breaking out some of these go routines into other functions would make this cleaner to review... there's a lot going on with this file and a lot of variables declared
} | ||
} | ||
}() | ||
pushProposerSettingsChan <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this supposed to push this empty struct? why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to push something to the channel. The empty struct means that the value is not important and should be discarded.
UPDATE
I made some changes since the original description. The main realization is that the main
for
loop should be as "pure" as possible. With every new case we increase the likelihood of duty handling not starting at the very beginning of the slot. Originally the main loop had only one purpose which was handling duties. I wanted to go back to this and so I moved all other channels to goroutines dedicated just for them. That way there is no contention in theselect
statement and the slot ticker handler can fire immediately. I also removedclose(eventsChannel)
from the event stream because reading from a closed channel always succeeds, resulting in a never-ending sequence of empty events when the event stream gets disconnected. https://rauljordan.com/no-sleep-until-we-build-the-perfect-library-in-go/ is a great resource showing what we should build eventually.Main functionality
The purpose of this PR is to make the validator client's runner more robust. This can be split into 3 categories.
1. Updating duties at proper times
This PR introduces an
updateDutiesNeeded
variable that is initially set tofalse
and is checked every slot. We update duties once before the mainfor
loop of the runner and then conditionally at each slot start if needed, which is:Currently
UpdateDuties
exits early if the validator has any duties and we are not at the start of an epoch. This means that the runner itself doesn't have control when duties will be updated as it can't enforce the update. Because of this we remove the condition fromUpdateDuties
and leave the decision to the caller.2. Pushing proposer settings at proper times
Similarly to updating duties, there are times when we want to push proposer settings:
But while updating duties should be blocking because performing roles depend on it, pushing proposer settings can be done asynchronously. If the validator client serves many keys, pushing proposer settings can take a non-trivial amount of time and we want to keep attesting/proposing.
This PR utilizes a goroutine+channel combination to signal that proposer settings should be updated. This allows a simple one-liner if we need to add more triggers in the future, and is probably quite idiomatic Go.
3. Switching over to a backup node
This PR simplifies how we track the health status of the beacon node inside the runner, switching to a polling solution instead of using a channel. Polling the latest value is much simpler when we consider that the current implementation works only for one subscriber, whereas the polling mechanism works for any number of subscribers. Therefore adding more subscribers in the future is trivial.
We move the health ticker into the tracker and update the health status internally. We also add a new ticker to the runner that gets fired at the last second of the slot, upon which we poll for health. One additional change is introducing an
initializationNeeded
variable that signals we need to make sure the beacon node is synced before we try performing duties again. Without this, switching over to a beacon node that is not 100% ready (or the original node becoming healthy again) results in lots of errors in the validator client's output.Other
There a few other things included in this PR, which are small improvements and bug fixes.
JsonRestHandler
and save the default timeout in the struct. This is because setting a timeout on an HTTP client ignores the timeout set on a request's context. In our case, pushing proposer settings has a timeout of 5 minutes but it would never be respected.pubkeyToValidatorIndex
map. Otherwise there is a possibility of a panic due to concurrent map writes (I ran into it when testing pushing proposer settings).Follow-up questions
for
loop execution becomes undeterministic, resulting in things like proposer settings being pushed several times, or the event stream being started several times. Although this could be probably fixed by changing how we execute each of these, similar issues might arise in the future where we rely on a specific code path being executed only once, but thefor
loop behaves unexpectedly. Can we make it more predictable?