-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize header download across peers to fetch headers within a chain's checkpointed region. #282
base: master
Are you sure you want to change the base?
Conversation
e2c69ee
to
7a8dac7
Compare
7d12571
to
f97291d
Compare
f97291d
to
9b15992
Compare
57c289f
to
593bc9d
Compare
Thanks a lot for running the workflow, please I have fixed the issues. I would appreciate it if you rerun the workflow @yyforyongyu |
This commit makes the query package more flexible in querying peers. This is done by adding sendQuery function field to query.Request struct. Instead of using only QueueMessageWithEncoding for all requests. This would be useful in coming commits where we would be using pushGetHeadersMsg to fetch block headers from peers. Consequent changes: encoding was removed in the queryJob field as it would not be useful for all requests. Requests that need encoding can define it as one of the fields in its own implementation of the new interface created as a type for the Req field in Request. PriorityIndex function signature would be used in the coming commits to indicate the the priority a request should preferably have in a query batch. An implementaion of the interface was created for GetCfheaders, GetCfilter and getData requests. Tests were updated and added to reflect these changes. Signed-off-by: Maureen Ononiwu <[email protected]>
Signed-off-by: Maureen Ononiwu <[email protected]>
Worker waits after timeout for response but job is scheduled on another worker so as not to be slowed down by one worker. Signed-off-by: Maureen Ononiwu <[email protected]>
- The workmanager is made to check if a request has a priority index before assigning a query index. - Job index type is also changed to float64 in this commit for flexibility. Signed-off-by: Maureen Ononiwu <[email protected]>
Adds CloneReq function field to query.Request struct. Jobs are cloned in the worker before sending to the workmanager. This would be useful in coming commits where a job's request is modified according to the response it gets. Such as in the case of block header fetching. A CloneReq function is defined in the instance of GetCFilter, GetCFHeader and GetData requests in this commit as well. Signed-off-by: Maureen Ononiwu <[email protected]>
- Added Unfinished bool to jobResult to indicate successful jobs that still need to send another request to the peer to be considered complete. - Made ErrChan a query option in that way it is optional for different queries. - Refactored HandleResp, peer is now passed as query.Peer instead of using its address. - Changed type for query.Progress. Signed-off-by: Maureen Ononiwu <[email protected]>
This commit distributes header download across peers leveraging checckpoints and the workmanager. Signed-off-by: Maureen Ononiwu <[email protected]>
593bc9d
to
64b2787
Compare
Hello @positiveblue, this PR is ready for review. |
blockmanager.go
Outdated
|
||
// PriorityIndex returns the specified priority the caller wants | ||
// the request to take. | ||
func (e *encodedQuery) PriorityIndex() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasonale for the priority index? It's adding quite a bit of complexity to the code.
Would the caller asking for block headers within a moving window suffice instead of having a priority index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The work manager assigns priority to jobs on a first come, first serve basis. There are cases where we might need to bump the priority of a job sent at a later time, that is when the priority index comes in. Example use case:
https://github.com/lightninglabs/neutrino/pull/282/files#diff-2345cea8b12f07d1f60d42b7e5563720e69e1544c641261cf147668c4599320bR2227 (I would be changing the priority to not 0 in my next push, though)
Where we might need to re-request headers after validation while fetching headers within the checkpointed region. If there was no priority index, this particular job would be handled last after all jobs, we do not want this as we would not be able to progress with block header verification and writing into the database if we do not have those headers we re-requested at that particular height (validation is done in order). This is because it is at the chain tip and therefore next in line to be processed here
Could you please explain more about what you mean by this?
Would the caller asking for block headers within a moving window suffice instead of having a priority index?
query/worker.go
Outdated
case <-quit: | ||
return | ||
} | ||
goto nexJobLoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would a continue
suffice here? Is there anything that this goto
provides that a continue
wouldn't do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks looking at it again, I think continue
could work, thanks.
// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers | ||
// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known | ||
// block height is behind the job query's start height, it returns false. Otherwise, it returns true. | ||
func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be needed. The workers should never be given peers that aren't eligible to be synced off of in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The work manager is not exclusively for fetching block headers. So it would receive connected peer messages for all peers. It just happens that in this use case where it is fetching block headers we are only interested in peers that fulfill the condition as specified in that function . That is why I included that there. So a worker could not be useful in fetching block headers but it could be eligible to fetch cfheaders or filters for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
judging by the name, the "next" argument should be known to be a block header fetch. If you accept all types of queries here then the function should be renamed and reworked to decide if the peer is eligible for that request in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the function is literally, IsWorkerEligibleForBlkHdrFetch
does it still ring as generic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is difference between the queryJob
for fetching block headers and that of fetching cfheaders is the Req field in Request.
https://github.com/Chinwendu20/neutrino/blob/64b278771ff75da0b30136af7d4ab0ace06d897e/query/interface.go#L152-L175
which is defined as an interface, its implementations are defined in the blockmanager.go
. So dealing with its concrete implementations in query package would lead to a circular dependence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there is a better way to go about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First off I just want to say congratulations for putting up this PR in the first place. This code isn't the most approachable and it's not organized in a way that made your task easy. The task you set out to solve is an important one for getting this technology into more people's hands around the world.
That said, I do believe there is a ton of work to do on this PR before we can get it merged. While I did go ahead and leave a bunch of comments on individual items I saw, I think that there is some more general work here to be done in terms of organization and so I strongly recommend attacking that before and use my comments as examples of what I'm talking about rather than a TODO list for what to tidy up.
Overall the biggest thing I'm seeing is that this code does not try to isolate concerns enough. When we are doing software engineering, the only way we can build up ever larger structures and not have them fall apart is because we can package up complexity in a way that it doesn't leak. Abstractly this allows us to entirely forget about how a piece of code works internally and only have to reason about how we interact with it. Concretely this means a few things.
- As much as possible try to write functions that take data in as arguments and return data out via return values. This allows the readers of your API to understand the contract, rather than having to read the code itself, which defeats the purpose of packaging complexity
- Try to make each function and each struct only responsible for one thing. It is OK to have nested structs within each other. It is not OK to have 2 conceptually distinct subsystems that are sharing a
struct
- Naming things well makes the difference between people being able to follow your thoughts and not. Try to name things that help people understand its role not its representation. After all, if you do your job in complexity packaging properly, readers should never have to care about its representation, but they will have to care a lot about how to use something. Secondly, in a statically typed language such as go, the type system will give us extra information on how something is represented, so there is no need to duplicate that information in the name.
- Avoid mixing "validation" code with "computation" code. You will inevitably need to sanitize and validate data, after it is validated you may wish to do some computation over that data. It may be tempting to package these things into the same function but it blurs the requirements your true API contract. When you write a function with a good function signature (precise argument and return types), it is often the case that a reader of your code does not have to read the implementation at all. This is a good thing. If you do the validation inside the function, it will allow you to be more permissive in the types that function accepts. While this might seem like a good thing at first, it makes it harder to understand what the code does.
As far as concrete design goes, I'll offer a bit of guidance here, and we can follow up on the LND Slack to iron out some details.
Conceptually you are trying to take the workload of the blockManager/workManager and do the following:
- get an outline of the full scope of work you need to do by getting the block headers and filter checkpoints
- creating a bunch of jobs to go fetch those series' of filter headers
- allocating those jobs to preferred peers
- tracking the progress of each of these jobs
- gluing the entire filter header chain together to make sure it lines up
Ideally you should be able to point to a single function that does each of these things. As I was reading the code, I was unable to. Perhaps this is a failure of my depth of understanding of the codebase, but even so, if I can't follow what is going on, it is likely others will have trouble as well. I want to reiterate, making this readable is not an easy task and the fact that you were able to get this far is quite an accomplishment, so don't be discouraged. It will take some thought to reimagine the approach to this PR but code is simply organized thought and you will be able to bring everything you've learned into the next revision. You also do not have to do this without support. You can ask questions in this thread, or better yet, reach out via Slack to run ideas by me. Looking forward to hearing from you.
// hdrTipSlice is a slice that holds request startHeight of the responses that have been | ||
// fetched using the workmanager to fetch headers within the chain's checkpointed region. | ||
// It is used to easily access this startheight in the case we have to delete these responses | ||
// in the hdrTipResponse map during a reorg while fetching headers within the chain's checkpointed | ||
// region. | ||
hdrTipSlice []int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally if you have to name something which is postfixed with the type that the thing is, it is symptomatic that you haven't described to yourself properly what role this thing is playing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it fair to say that what you mean by "header tip" is that it is the start height of a particular batch? If so I strongly recommend not calling that the tip. In Bitcoin we typically think of the "tip" of the chain as the last item in that chain, NOT the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my reasoning behind this was that in the processBlKHeaderInCheckPtRegionInOrder
function, I am literally mapping the responses gotten at each header tip at that point to header tip itself but I guess it can be refactored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not suggesting refactoring in this particular case. Though I think the PR overall would benefit from being broken up a bit more. In this case I'm more commenting on the fact that the name of this field doesn't describe the field's purpose very well, making it hard to figure out what it's being used for. Can you define for me precisely what a "header tip" is in the way you are using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because at the point of writing into the map, I am literally mapping the response gotten at each header tip i.e. chain tip to the chain tip (header tip) itself but if that is not clear, I could refactor (rename) the slice to a name you suggest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comments added to the slice could help in understanding what the slice is about. Is the comment not clear as well? I think I should work on the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this to regionRoots
since it represents the header root for each region. Note that the blockchain is a linked list pointing back towards genesis. The chain "tip" is the most recent block in the blockchain. Correspondingly the region's tip is the last link in that region. An appropriate name for the first link in that region would be "root" instead of "tip"
func cloneHeaderQuery(req query.ReqMessage) query.ReqMessage { | ||
oldReq, ok := req.(*headerQuery) | ||
if !ok { | ||
log.Errorf("request not of type *wire.MsgGetHeaders") | ||
} | ||
oldReqMessage := req.Message().(*wire.MsgGetHeaders) | ||
message := &headerQuery{ | ||
message: &wire.MsgGetHeaders{ | ||
BlockLocatorHashes: oldReqMessage.BlockLocatorHashes, | ||
HashStop: oldReqMessage.HashStop, | ||
}, | ||
startHeight: oldReq.startHeight, | ||
initialHeight: oldReq.initialHeight, | ||
startHash: oldReq.startHash, | ||
endHeight: oldReq.endHeight, | ||
} | ||
|
||
return message | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't mix type downcasting with cloning, make them two separate functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about this?
oldReq, ok := req.(*headerQuery)
if !ok {
log.Errorf("request not of type *wire.MsgGetHeaders")
}
It just seems to little to put in a different function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't need to go in its own function. Go has decent enough ergonomics on downcasting types to do it at the call site where it matters. What I'm suggesting here is making it such that your clone operation only does cloning and only accepting the subtype that you are trying to clone.
Let's examine what this is actually doing. We are taking an umbrella type, and if it isn't the subtype we expect we are only logging it, we are not returning an error here. Why would we want to log an error if we weren't going to halt termination? Further, why are we accepting a type that is "larger" than the type we actually want to clone? Functions should do one thing conceptually. This one is kind of doing 3 but it's not clear why the three things it is doing is conceptually one thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the Request
struct, is not for only one subtype. cloneHeaderQuery
should follow this function signature defined in the Request
struct:
So it is not clear to me how to limit it to one subtype only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern of only logging the error is valid. I am thinking of a way to go about it. Perhaps make the function return an error as well.
|
||
request, _ := task.Message().(*wire.MsgGetHeaders) | ||
|
||
requestMsg := task.(*headerQuery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you be sure this is a valid headerQuery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess it can do with some type error handling. Thanks.
return query.ErrIgnoreRequest | ||
} | ||
|
||
sp := peer.(*ServerPeer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you be sure this is a valid ServerPeer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess it can do with some type error handling. Thanks.
// If the result is unfinished add 0.0005 to the job index to maintain the | ||
// required priority then push to work queue | ||
if result.unfinished { | ||
result.job.index = result.job.Index() + 0.0005 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add to your api a way to bump priority, or don't use the interface Index(). Mixing like this just breaks the value of doing an interface to begin with. Keep it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a way to bump priority already:
f168538#diff-2345cea8b12f07d1f60d42b7e5563720e69e1544c641261cf147668c4599320b
Explained it here:
#282 (comment)
But this does not apply here, I can change the job.Index() to job.index. The workmanager already handles assigning index to jobs so this is not a new functionality
case results <- &jobResult{ | ||
job: job, | ||
peer: peer, | ||
err: err, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is unfinished
intentionally omitted here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes as bool
is already false by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think including it would make the code more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about redundancy?
// processBlKHeaderInCheckPtRegionInOrder handles and writes the block headers received from querying the | ||
// workmanager while fetching headers within the block header checkpoint region. This process is carried out | ||
// in order. | ||
func (b *blockManager) processBlKHeaderInCheckPtRegionInOrder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name reflects a lack of clarity of purpose here. Judging from the rest of the context it seems like this is one of the main coroutines that the blockManager is responsible for running. I'd recommend carving out whatever the responsibility of this coroutine is and giving it its own structure rather than mixing it in with the rest of the block manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So create a different struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. First we need to figure out what this method is doing, what the minimal set of arguments would be and see if we can carve it off of the blockManager. It seems like the appropriate thing here would be to make this function apply to some sort of "batch" structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment on top of the function definition would help in understanding what it is about. IMO, I do not think this needs a different struct, if it does then, the cfhandler
function and the functions/functionality involved in writing the cfheaders
to the store in order should be in a different struct as well as this is block header version of it. I also think the naming of the function would also help understand what the function is about, it is literally, "processing block header within check point region in order".
Thanks for taking out your time to review this code and offer your invaluable suggestions. I guess if a lot of people are finding it hard to follow the code then it is a call of concern. This PR basically just makes adjustments to the query package to enable the workmanager orchestrate the process of fetching block headers across various peers in a non-synchronous way. The pull request can be broken down into two parts:
Trying to reimagine and work on your various concerns and so I have responded to your comments on the code. I would be happy to understand your perspective better and use yours and the community's invaluable insights to help make this PR better. |
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool { | ||
queryGetHeaders, ok := req.(*headerQuery) | ||
|
||
if !ok { | ||
log.Debugf("request is not type headerQuery") | ||
|
||
return true | ||
} | ||
|
||
if sp.LastBlock() < queryGetHeaders.startHeight { | ||
return true | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just pass in the start height, this function should take two arguments: the server peer, and the start height. This is also doing a downcasting operation and that clutters the logic, do that at the call site. You are duplicating the downcasting logic in a lot of places and it isn't necessary. Also returning true in the case that the downcast fails clashes strongly with the name here. This function could just be
func (sp *ServerPeeer) IsPeerBehindStartHeight(h int32) bool {
return sp.LastBlock() < h
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// CloneReq clones the message. | ||
CloneReq func(message ReqMessage) ReqMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why do we need that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explained it in the code:
Also reading the commit message that effected this change would help:
dd02e22
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should be modifying the request. It seems you're using a common structure for what ought to be two different data types. One represents the request itself, the top level job. The other represents the remaining work on that job. The latter is conceptually an in-progress job. I think you should label it as such and then rather than talking about it as a "clone", it's really a construction of a "InProgressRequest" or something to that effect.
// addNewPeerToList adds the peer to the peers list. | ||
func (b *blockManager) addNewPeerToList(peers *list.List, sp *ServerPeer) { | ||
// Ignore if in the process of shutting down. | ||
if atomic.LoadInt32(&b.shutdown) != 0 { | ||
return | ||
} | ||
|
||
log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) | ||
|
||
// Add the peer as a candidate to sync from. | ||
peers.PushBack(sp) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this on the blockManager? and why is it checking for shutdown rather than just doing the thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think this particular change and that of the removeDonePeerFromList
should be in a different commit. This function is an extraction of this:
Lines 367 to 381 in 42a196f
func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) { | |
// Ignore if in the process of shutting down. | |
if atomic.LoadInt32(&b.shutdown) != 0 { | |
return | |
} | |
log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) | |
// Ignore the peer if it's not a sync candidate. | |
if !b.isSyncCandidate(sp) { | |
return | |
} | |
// Add the peer as a candidate to sync from. | |
peers.PushBack(sp) |
It was extracted because we only need the block handler to carry on with that when a new peer is added while fetching checkpointed block headers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This extraction is rather arbitrary, I cannot think of a good reason why a function whose stated purpose is to add the peer to the list needs to check for shutdown. I have checked all of the callsites of this function and all of them have redundant shutdown logic, so please remove the shutdown logic from this method and then remove the blockManager receiver argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking further, I understand why you did this, but that doesn't mean we should let it remain. The handleNewPeerMsg
not taking a message is wrong. We need to fix both of them. I'm aware this precedes your involvement but it should be kept consistent and move in a direction of greater readability
// removeDonePeerFromList removes the peer from the peers list. | ||
func (b *blockManager) removeDonePeerFromList(peers *list.List, sp *ServerPeer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this on the blockManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extraction of this:
Lines 426 to 434 in 42a196f
// Remove the peer from the list of candidate peers. | |
for e := peers.Front(); e != nil; e = e.Next() { | |
if e.Value == sp { | |
peers.Remove(e) | |
break | |
} | |
} | |
log.Infof("Lost peer %s", sp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It contains no references to the blockManager in the function body and is therefore a spurious argument. Do not accept arguments you do not use.
func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { | ||
// Remove the peer from the list of candidate peers. | ||
b.removeDonePeerFromList(peers, sp) | ||
|
||
log.Infof("Lost peer %s", sp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no "Msg" in any of the arguments here. The function signature does not give good indication of what this is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not write that function signature. It was approved and merged into the codebase and it is currently present there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistency, change the function signature for this function to take donePeerMsg
and change the callsite to pass the entire message in.
Alternatively, rename the function to better describe what it's doing. The latter is more ambitious given that it is hard to concisely describe the process of both removing the peer and searching for a new sync peer. I do not recommend doing this unless you have the patience to make this refactor much larger.
// cloneMsgCFHeaders clones query.ReqMessage that contains the MsgGetCFHeaders message. | ||
func cloneMsgCFHeaders(req query.ReqMessage) query.ReqMessage { | ||
oldReq, ok := req.(*encodedQuery) | ||
if !ok { | ||
log.Errorf("request not of type *encodedQuery") | ||
} | ||
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFHeaders) | ||
if !ok { | ||
log.Errorf("request not of type *wire.MsgGetCFHeaders") | ||
} | ||
newReq := &encodedQuery{ | ||
message: wire.NewMsgGetCFHeaders( | ||
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash, | ||
), | ||
encoding: oldReq.encoding, | ||
priorityIndex: oldReq.priorityIndex, | ||
} | ||
return newReq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has pretty bad undefined behavior if your downcast fails. I pretty strongly object to this design style.
As stated in another one of my comments, you almost certainly do not intend to clone this object. What you want is a new data structure that captures the "in progress" state you will be continually modifying and the original job description to remain unchanged. This allows you to separate the data that represents the job description from the data that you need for bookkeeping on jobs that are currently in progress.
Finally, you should use interface downcasting very sparingly. I'd almost recommend avoiding it altogether unless you're doing it for interfaces that model algebraic sum types. If you don't know what to do instead, see if you can add it to the interface definition, or better yet, just write the function over the concrete type.
@yyforyongyu: review reminder |
Closing due to inactivity |
8 similar comments
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
Closing due to inactivity |
!lightninglabs-deploy mute |
This pull request parellizes header download across peers within a chain's checkpointed region.
If neutrino's chain has checkpoints and its header tip is within that checkpointed region, the blockmanager batches multiple getheaders requests and sends them to the work manager. Even though we fetch in parallel we must validate and write to the DB in order. The
processBlKHeaderInCheckPtRegionInOrder
loop handles the fetched headers and writes to the DB in order. After fetching headers in parallel within the checkpointed region, the headers in the uncheckpointed region is fetched synchronously as before.The changes are detailed in the commit messages.
Fixing #71