-
Notifications
You must be signed in to change notification settings - Fork 183
/
blockmanager.go
3046 lines (2620 loc) · 91.9 KB
/
blockmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// NOTE: THIS API IS UNSTABLE RIGHT NOW AND WILL GO MOSTLY PRIVATE SOON.
package neutrino
import (
"bytes"
"container/list"
"fmt"
"math"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/gcs"
"github.com/btcsuite/btcd/btcutil/gcs/builder"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/banman"
"github.com/lightninglabs/neutrino/blockntfns"
"github.com/lightninglabs/neutrino/chainsync"
"github.com/lightninglabs/neutrino/headerfs"
"github.com/lightninglabs/neutrino/headerlist"
"github.com/lightninglabs/neutrino/query"
)
const (
// numMaxMemHeaders is the max number of headers to store in memory for
// a particular peer. By bounding this value, we're able to closely
// control our effective memory usage during initial sync and re-org
// handling. This value should be set a "sane" re-org size, such that
// we're able to properly handle re-orgs in size strictly less than
// this value.
numMaxMemHeaders = 10000
// retryTimeout is the time we'll wait between failed queries to fetch
// filter checkpoints and headers.
retryTimeout = 3 * time.Second
// maxCFCheckptsPerQuery is the maximum number of filter header
// checkpoints we can query for within a single message over the wire.
maxCFCheckptsPerQuery = wire.MaxCFHeadersPerMsg / wire.CFCheckptInterval
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash chainhash.Hash
// newPeerMsg signifies a newly connected peer to the block handler.
type newPeerMsg struct {
peer *ServerPeer
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
inv *wire.MsgInv
peer *ServerPeer
}
// headersMsg packages a bitcoin headers message and the peer it came from
// together so the block handler has access to that information.
type headersMsg struct {
headers *wire.MsgHeaders
peer *ServerPeer
}
// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *ServerPeer
}
// blockManagerCfg holds options and dependencies needed by the blockManager
// during operation.
type blockManagerCfg struct {
// ChainParams is the chain that we're running on.
ChainParams chaincfg.Params
// BlockHeaders is the store where blockheaders are persistently
// stored.
BlockHeaders headerfs.BlockHeaderStore
// RegFilterHeaders is the store where filter headers for the regular
// compact filters are persistently stored.
RegFilterHeaders *headerfs.FilterHeaderStore
// TimeSource is used to access a time estimate based on the clocks of
// the connected peers.
TimeSource blockchain.MedianTimeSource
// QueryDispatcher is used to make queries to connected Bitcoin peers.
QueryDispatcher query.Dispatcher
// BanPeer bans and disconnects the given peer.
BanPeer func(addr string, reason banman.Reason) error
// GetBlock fetches a block from the p2p network.
GetBlock func(chainhash.Hash, ...QueryOption) (*btcutil.Block, error)
// firstPeerSignal is a channel that's sent upon once the main daemon
// has made its first peer connection. We use this to ensure we don't
// try to perform any queries before we have our first peer.
firstPeerSignal <-chan struct{}
queryAllPeers func(
queryMsg wire.Message,
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
options ...QueryOption)
}
// blockManager provides a concurrency safe block manager for handling all
// incoming blocks.
type blockManager struct { // nolint:maligned
started int32 // To be used atomically.
shutdown int32 // To be used atomically.
cfg *blockManagerCfg
// blkHeaderProgressLogger is a progress logger that we'll use to
// update the number of blocker headers we've processed in the past 10
// seconds within the log.
blkHeaderProgressLogger *headerProgressLogger
// fltrHeaderProgessLogger is a process logger similar to the one
// above, but we'll use it to update the progress of the set of filter
// headers that we've verified in the past 10 seconds.
fltrHeaderProgessLogger *headerProgressLogger
// genesisHeader is the filter header of the genesis block.
genesisHeader chainhash.Hash
// headerTip will be set to the current block header tip at all times.
// Callers MUST hold the lock below each time they read/write from
// this field.
headerTip uint32
// headerTipHash will be set to the hash of the current block header
// tip at all times. Callers MUST hold the lock below each time they
// read/write from this field.
headerTipHash chainhash.Hash
// newHeadersMtx is the mutex that should be held when reading/writing
// the headerTip variable above.
//
// NOTE: When using this mutex along with newFilterHeadersMtx at the
// same time, newHeadersMtx should always be acquired first.
newHeadersMtx sync.RWMutex
// newHeadersSignal is condition variable which will be used to notify
// any waiting callers (via Broadcast()) that the tip of the current
// chain has changed. This is useful when callers need to know we have
// a new tip, but not necessarily each block that was connected during
// switch over.
newHeadersSignal *sync.Cond
// filterHeaderTip will be set to the height of the current filter
// header tip at all times. Callers MUST hold the lock below each time
// they read/write from this field.
filterHeaderTip uint32
// filterHeaderTipHash will be set to the current block hash of the
// block at height filterHeaderTip at all times. Callers MUST hold the
// lock below each time they read/write from this field.
filterHeaderTipHash chainhash.Hash
// newFilterHeadersMtx is the mutex that should be held when
// reading/writing the filterHeaderTip variable above.
//
// NOTE: When using this mutex along with newHeadersMtx at the same
// time, newHeadersMtx should always be acquired first.
newFilterHeadersMtx sync.RWMutex
// newFilterHeadersSignal is condition variable which will be used to
// notify any waiting callers (via Broadcast()) that the tip of the
// current filter header chain has changed. This is useful when callers
// need to know we have a new tip, but not necessarily each filter
// header that was connected during switch over.
newFilterHeadersSignal *sync.Cond
// syncPeer points to the peer that we're currently syncing block
// headers from.
syncPeer *ServerPeer
// syncPeerMutex protects the above syncPeer pointer at all times.
syncPeerMutex sync.RWMutex
// peerChan is a channel for messages that come from peers
peerChan chan interface{}
// blockNtfnChan is a channel in which the latest block notifications
// for the tip of the chain will be sent upon.
blockNtfnChan chan blockntfns.BlockNtfn
wg sync.WaitGroup
quit chan struct{}
headerList headerlist.Chain
reorgList headerlist.Chain
startHeader *headerlist.Node
nextCheckpoint *chaincfg.Checkpoint
lastRequested chainhash.Hash
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block
}
// newBlockManager returns a new bitcoin block manager. Use Start to begin
// processing asynchronous block and inv updates.
func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
targetTimespan := int64(cfg.ChainParams.TargetTimespan / time.Second)
targetTimePerBlock := int64(cfg.ChainParams.TargetTimePerBlock / time.Second)
adjustmentFactor := cfg.ChainParams.RetargetAdjustmentFactor
bm := blockManager{
cfg: cfg,
peerChan: make(chan interface{}, MaxPeers*3),
blockNtfnChan: make(chan blockntfns.BlockNtfn),
blkHeaderProgressLogger: newBlockProgressLogger(
"Processed", "block", log,
),
fltrHeaderProgessLogger: newBlockProgressLogger(
"Verified", "filter header", log,
),
headerList: headerlist.NewBoundedMemoryChain(
numMaxMemHeaders,
),
reorgList: headerlist.NewBoundedMemoryChain(
numMaxMemHeaders,
),
quit: make(chan struct{}),
blocksPerRetarget: int32(targetTimespan / targetTimePerBlock),
minRetargetTimespan: targetTimespan / adjustmentFactor,
maxRetargetTimespan: targetTimespan * adjustmentFactor,
}
// Next we'll create the two signals that goroutines will use to wait
// on a particular header chain height before starting their normal
// duties.
bm.newHeadersSignal = sync.NewCond(&bm.newHeadersMtx)
bm.newFilterHeadersSignal = sync.NewCond(&bm.newFilterHeadersMtx)
// We fetch the genesis header to use for verifying the first received
// interval.
genesisHeader, err := cfg.RegFilterHeaders.FetchHeaderByHeight(0)
if err != nil {
return nil, err
}
bm.genesisHeader = *genesisHeader
// Initialize the next checkpoint based on the current height.
header, height, err := cfg.BlockHeaders.ChainTip()
if err != nil {
return nil, err
}
bm.nextCheckpoint = bm.findNextHeaderCheckpoint(int32(height))
bm.headerList.ResetHeaderState(headerlist.Node{
Header: *header,
Height: int32(height),
})
bm.headerTip = height
bm.headerTipHash = header.BlockHash()
// Finally, we'll set the filter header tip so any goroutines waiting
// on the condition obtain the correct initial state.
_, bm.filterHeaderTip, err = cfg.RegFilterHeaders.ChainTip()
if err != nil {
return nil, err
}
// We must also ensure the filter header tip hash is set to the block
// hash at the filter tip height.
fh, err := cfg.BlockHeaders.FetchHeaderByHeight(bm.filterHeaderTip)
if err != nil {
return nil, err
}
bm.filterHeaderTipHash = fh.BlockHash()
return &bm, nil
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
if atomic.AddInt32(&b.started, 1) != 1 {
return
}
log.Trace("Starting block manager")
b.wg.Add(2)
go b.blockHandler()
go func() {
defer b.wg.Done()
log.Debug("Waiting for peer connection...")
// Before starting the cfHandler we want to make sure we are
// connected with at least one peer.
select {
case <-b.cfg.firstPeerSignal:
case <-b.quit:
return
}
log.Debug("Peer connected, starting cfHandler.")
b.cfHandler()
}()
}
// Stop gracefully shuts down the block manager by stopping all asynchronous
// handlers and waiting for them to finish.
func (b *blockManager) Stop() error {
if atomic.AddInt32(&b.shutdown, 1) != 1 {
log.Warnf("Block manager is already in the process of " +
"shutting down")
return nil
}
// We'll send out update signals before the quit to ensure that any
// goroutines waiting on them will properly exit.
done := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Millisecond * 50)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
}
b.newHeadersSignal.Broadcast()
b.newFilterHeadersSignal.Broadcast()
}
}()
log.Infof("Block manager shutting down")
close(b.quit)
b.wg.Wait()
close(done)
return nil
}
// NewPeer informs the block manager of a newly active peer.
func (b *blockManager) NewPeer(sp *ServerPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
select {
case b.peerChan <- &newPeerMsg{peer: sp}:
case <-b.quit:
return
}
}
// handleNewPeerMsg deals with new peers that have signalled they may be
// considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler
// goroutine.
func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) {
// Ignore if in the process of shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
log.Infof("New valid peer %s (%s)", sp, sp.UserAgent())
// Ignore the peer if it's not a sync candidate.
if !b.isSyncCandidate(sp) {
return
}
// Add the peer as a candidate to sync from.
peers.PushBack(sp)
// If we're current with our sync peer and the new peer is advertising
// a higher block than the newest one we know of, request headers from
// the new peer.
_, height, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
log.Criticalf("Couldn't retrieve block header chain tip: %s",
err)
return
}
if height < uint32(sp.StartingHeight()) && b.BlockHeadersSynced() {
locator, err := b.cfg.BlockHeaders.LatestBlockLocator()
if err != nil {
log.Criticalf("Couldn't retrieve latest block "+
"locator: %s", err)
return
}
stopHash := &zeroHash
_ = sp.PushGetHeadersMsg(locator, stopHash)
}
// Start syncing by choosing the best candidate if needed.
b.startSync(peers)
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(sp *ServerPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
select {
case b.peerChan <- &donePeerMsg{peer: sp}:
case <-b.quit:
return
}
}
// handleDonePeerMsg deals with peers that have signalled they are done. It
// removes the peer as a candidate for syncing and in the case where it was the
// current sync peer, attempts to select a new best peer to sync from. It is
// invoked from the syncHandler goroutine.
func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) {
// Remove the peer from the list of candidate peers.
for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == sp {
peers.Remove(e)
break
}
}
log.Infof("Lost peer %s", sp)
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the header state.
if b.SyncPeer() != nil && b.SyncPeer() == sp {
b.syncPeerMutex.Lock()
b.syncPeer = nil
b.syncPeerMutex.Unlock()
header, height, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
return
}
b.headerList.ResetHeaderState(headerlist.Node{
Header: *header,
Height: int32(height),
})
b.startSync(peers)
}
}
// cfHandler is the cfheader download handler for the block manager. It must be
// run as a goroutine. It requests and processes cfheaders messages in a
// separate goroutine from the peer handlers.
func (b *blockManager) cfHandler() {
defer log.Trace("Committed filter header handler done")
var (
// allCFCheckpoints is a map from our peers to the list of
// filter checkpoints they respond to us with. We'll attempt to
// get filter checkpoints immediately up to the latest block
// checkpoint we've got stored to avoid doing unnecessary
// fetches as the block headers are catching up.
allCFCheckpoints map[string][]*chainhash.Hash
// lastCp will point to the latest block checkpoint we have for
// the active chain, if any.
lastCp chaincfg.Checkpoint
// blockCheckpoints is the list of block checkpoints for the
// active chain.
blockCheckpoints = b.cfg.ChainParams.Checkpoints
)
// Set the variable to the latest block checkpoint if we have any for
// this chain. Otherwise this block checkpoint will just stay at height
// 0, which will prompt us to look at the block headers to fetch
// checkpoints below.
if len(blockCheckpoints) > 0 {
lastCp = blockCheckpoints[len(blockCheckpoints)-1]
}
waitForHeaders:
// We'll wait until the main header sync is either finished or the
// filter headers are lagging at least a checkpoint interval behind the
// block headers, before we actually start to sync the set of
// cfheaders. We do this to speed up the sync, as the check pointed
// sync is faster, than fetching each header from each peer during the
// normal "at tip" syncing.
log.Infof("Waiting for more block headers, then will start "+
"cfheaders sync from height %v...", b.filterHeaderTip)
b.newHeadersSignal.L.Lock()
b.newFilterHeadersMtx.RLock()
for !(b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip || b.BlockHeadersSynced()) {
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.Wait()
// While we're awake, we'll quickly check to see if we need to
// quit early.
select {
case <-b.quit:
b.newHeadersSignal.L.Unlock()
return
default:
}
// Re-acquire the lock in order to check for the filter header
// tip at the next iteration of the loop.
b.newFilterHeadersMtx.RLock()
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.L.Unlock()
// Now that the block headers are finished or ahead of the filter
// headers, we'll grab the current chain tip so we can base our filter
// header sync off of that.
lastHeader, lastHeight, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
log.Critical(err)
return
}
lastHash := lastHeader.BlockHash()
b.newFilterHeadersMtx.RLock()
log.Infof("Starting cfheaders sync from (block_height=%v, "+
"block_hash=%v) to (block_height=%v, block_hash=%v)",
b.filterHeaderTip, b.filterHeaderTipHash, lastHeight,
lastHeader.BlockHash())
b.newFilterHeadersMtx.RUnlock()
fType := wire.GCSFilterRegular
store := b.cfg.RegFilterHeaders
log.Infof("Starting cfheaders sync for filter_type=%v", fType)
// If we have less than a full checkpoint's worth of blocks, such as on
// simnet, we don't really need to request checkpoints as we'll get 0
// from all peers. We can go on and just request the cfheaders.
var goodCheckpoints []*chainhash.Hash
for len(goodCheckpoints) == 0 && lastHeight >= wire.CFCheckptInterval {
// Quit if requested.
select {
case <-b.quit:
return
default:
}
// If the height now exceeds the height at which we fetched the
// checkpoints last time, we must query our peers again.
if minCheckpointHeight(allCFCheckpoints) < lastHeight {
// Start by getting the filter checkpoints up to the
// height of our block header chain. If we have a chain
// checkpoint that is past this height, we use that
// instead. We do this so we don't have to fetch all
// filter checkpoints each time our block header chain
// advances.
// TODO(halseth): fetch filter checkpoints up to the
// best block of the connected peers.
bestHeight := lastHeight
bestHash := lastHash
if bestHeight < uint32(lastCp.Height) {
bestHeight = uint32(lastCp.Height)
bestHash = *lastCp.Hash
}
log.Debugf("Getting filter checkpoints up to "+
"height=%v, hash=%v", bestHeight, bestHash)
allCFCheckpoints = b.getCheckpts(&bestHash, fType)
if len(allCFCheckpoints) == 0 {
log.Warnf("Unable to fetch set of " +
"candidate checkpoints, trying again...")
select {
case <-time.After(retryTimeout):
case <-b.quit:
return
}
continue
}
}
// Cap the received checkpoints at the current height, as we
// can only verify checkpoints up to the height we have block
// headers for.
checkpoints := make(map[string][]*chainhash.Hash)
for p, cps := range allCFCheckpoints {
for i, cp := range cps {
height := uint32(i+1) * wire.CFCheckptInterval
if height > lastHeight {
break
}
checkpoints[p] = append(checkpoints[p], cp)
}
}
// See if we can detect which checkpoint list is correct. If
// not, we will cycle again.
goodCheckpoints, err = b.resolveConflict(
checkpoints, store, fType,
)
if err != nil {
log.Warnf("got error attempting to determine correct "+
"cfheader checkpoints: %v, trying again", err)
}
if len(goodCheckpoints) == 0 {
select {
case <-time.After(retryTimeout):
case <-b.quit:
return
}
}
}
// Get all the headers up to the last known good checkpoint.
b.getCheckpointedCFHeaders(
goodCheckpoints, store, fType,
)
// Now we check the headers again. If the block headers are not yet
// current, then we go back to the loop waiting for them to finish.
if !b.BlockHeadersSynced() {
goto waitForHeaders
}
// If block headers are current, but the filter header tip is still
// lagging more than a checkpoint interval behind the block header tip,
// we also go back to the loop to utilize the faster check pointed
// fetching.
b.newHeadersMtx.RLock()
b.newFilterHeadersMtx.RLock()
if b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip {
b.newFilterHeadersMtx.RUnlock()
b.newHeadersMtx.RUnlock()
goto waitForHeaders
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersMtx.RUnlock()
log.Infof("Fully caught up with cfheaders at height "+
"%v, waiting at tip for new blocks", lastHeight)
// Now that we've been fully caught up to the tip of the current header
// chain, we'll wait here for a signal that more blocks have been
// connected. If this happens then we'll do another round to fetch the
// new set of filter new set of filter headers
for {
// We'll wait until the filter header tip and the header tip
// are mismatched.
b.newHeadersSignal.L.Lock()
b.newFilterHeadersMtx.RLock()
for b.filterHeaderTipHash == b.headerTipHash {
// We'll wait here until we're woken up by the
// broadcast signal.
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.Wait()
// Before we proceed, we'll check if we need to exit at
// all.
select {
case <-b.quit:
b.newHeadersSignal.L.Unlock()
return
default:
}
// Re-acquire the lock in order to check for the filter
// header tip at the next iteration of the loop.
b.newFilterHeadersMtx.RLock()
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.L.Unlock()
// At this point, we know that there're a set of new filter
// headers to fetch, so we'll grab them now.
if err = b.getUncheckpointedCFHeaders(
store, fType,
); err != nil {
log.Debugf("couldn't get uncheckpointed headers for "+
"%v: %v", fType, err)
select {
case <-time.After(retryTimeout):
case <-b.quit:
return
}
}
// Quit if requested.
select {
case <-b.quit:
return
default:
}
}
}
// getUncheckpointedCFHeaders gets the next batch of cfheaders from the
// network, if it can, and resolves any conflicts between them. It then writes
// any verified headers to the store.
func (b *blockManager) getUncheckpointedCFHeaders(
store *headerfs.FilterHeaderStore, fType wire.FilterType) error {
// Get the filter header store's chain tip.
filterTip, filtHeight, err := store.ChainTip()
if err != nil {
return fmt.Errorf("error getting filter chain tip: %v", err)
}
blockHeader, blockHeight, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
return fmt.Errorf("error getting block chain tip: %v", err)
}
// If the block height is somehow before the filter height, then this
// means that we may still be handling a re-org, so we'll bail our so
// we can retry after a timeout.
if blockHeight < filtHeight {
return fmt.Errorf("reorg in progress, waiting to get "+
"uncheckpointed cfheaders (block height %d, filter "+
"height %d", blockHeight, filtHeight)
}
// If the heights match, then we're fully synced, so we don't need to
// do anything from there.
if blockHeight == filtHeight {
log.Tracef("cfheaders already caught up to blocks")
return nil
}
log.Infof("Attempting to fetch set of un-checkpointed filters "+
"at height=%v, hash=%v", blockHeight, blockHeader.BlockHash())
// Query all peers for the responses.
startHeight := filtHeight + 1
headers, numHeaders := b.getCFHeadersForAllPeers(startHeight, fType)
// Ban any peer that responds with the wrong prev filter header.
for peer, msg := range headers {
if msg.PrevFilterHeader != *filterTip {
err := b.cfg.BanPeer(peer, banman.InvalidFilterHeader)
if err != nil {
log.Errorf("Unable to ban peer %v: %v", peer, err)
}
delete(headers, peer)
}
}
if len(headers) == 0 {
return fmt.Errorf("couldn't get cfheaders from peers")
}
// For each header, go through and check whether all headers messages
// have the same filter hash. If we find a difference, get the block,
// calculate the filter, and throw out any mismatching peers.
for i := 0; i < numHeaders; i++ {
if checkForCFHeaderMismatch(headers, i) {
targetHeight := startHeight + uint32(i)
badPeers, err := b.detectBadPeers(
headers, targetHeight, uint32(i), fType,
)
if err != nil {
return err
}
log.Warnf("Banning %v peers due to invalid filter "+
"headers", len(badPeers))
for _, peer := range badPeers {
err := b.cfg.BanPeer(
peer, banman.InvalidFilterHeader,
)
if err != nil {
log.Errorf("Unable to ban peer %v: %v",
peer, err)
}
delete(headers, peer)
}
}
}
// Get the longest filter hash chain and write it to the store.
key, maxLen := "", 0
for peer, msg := range headers {
if len(msg.FilterHashes) > maxLen {
key, maxLen = peer, len(msg.FilterHashes)
}
}
// We'll now fetch the set of pristine headers from the map. If ALL the
// peers were banned, then we won't have a set of headers at all. We'll
// return nil so we can go to the top of the loop and fetch from a new
// set of peers.
pristineHeaders, ok := headers[key]
if !ok {
return fmt.Errorf("all peers served bogus headers, retrying " +
"with new set")
}
_, _, err = b.writeCFHeadersMsg(pristineHeaders, store)
return err
}
// checkpointedCFHeadersQuery holds all information necessary to perform and
// handle a query for checkpointed filter headers.
type checkpointedCFHeadersQuery struct {
blockMgr *blockManager
msgs []wire.Message
checkpoints []*chainhash.Hash
stopHashes map[chainhash.Hash]uint32
headerChan chan *wire.MsgCFHeaders
}
// requests creates the query.Requests for this CF headers query.
func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
reqs := make([]*query.Request, len(c.msgs))
for idx, m := range c.msgs {
reqs[idx] = &query.Request{
Req: m,
HandleResp: c.handleResponse,
}
}
return reqs
}
// handleResponse is the internal response handler used for requests for this
// CFHeaders query.
func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {
r, ok := resp.(*wire.MsgCFHeaders)
if !ok {
// We are only looking for cfheaders messages.
return query.Progress{
Finished: false,
Progressed: false,
}
}
q, ok := req.(*wire.MsgGetCFHeaders)
if !ok {
// We sent a getcfheaders message, so that's what we should be
// comparing against.
return query.Progress{
Finished: false,
Progressed: false,
}
}
// The response doesn't match the query.
if q.FilterType != r.FilterType || q.StopHash != r.StopHash {
return query.Progress{
Finished: false,
Progressed: false,
}
}
checkPointIndex, ok := c.stopHashes[r.StopHash]
if !ok {
// We never requested a matching stop hash.
return query.Progress{
Finished: false,
Progressed: false,
}
}
// Use either the genesis header or the previous checkpoint index as
// the previous checkpoint when verifying that the filter headers in
// the response match up.
prevCheckpoint := &c.blockMgr.genesisHeader
if checkPointIndex > 0 {
prevCheckpoint = c.checkpoints[checkPointIndex-1]
}
// The index of the next checkpoint will depend on whether the query
// was able to allocate maxCFCheckptsPerQuery.
nextCheckPointIndex := checkPointIndex + maxCFCheckptsPerQuery - 1
if nextCheckPointIndex >= uint32(len(c.checkpoints)) {
nextCheckPointIndex = uint32(len(c.checkpoints)) - 1
}
nextCheckpoint := c.checkpoints[nextCheckPointIndex]
// The response doesn't match the checkpoint.
if !verifyCheckpoint(prevCheckpoint, nextCheckpoint, r) {
log.Warnf("Checkpoints at index %v don't match response!!!",
checkPointIndex)
// If the peer gives us a header that doesn't match what we
// know to be the best checkpoint, then we'll ban the peer so
// we can re-allocate the query elsewhere.
err := c.blockMgr.cfg.BanPeer(
peerAddr, banman.InvalidFilterHeaderCheckpoint,
)
if err != nil {
log.Errorf("Unable to ban peer %v: %v", peerAddr, err)
}
return query.Progress{
Finished: false,
Progressed: false,
}
}
// At this point, the response matches the query, and the relevant
// checkpoint we got earlier, so we'll deliver the verified headers on
// the headerChan. We'll also return a Progress indicating the query
// finished, that the peer looking for the answer to this query can
// move on to the next query.
select {
case c.headerChan <- r:
case <-c.blockMgr.quit:
return query.Progress{
Finished: false,
Progressed: false,
}
}
return query.Progress{
Finished: true,
Progressed: true,
}
}
// getCheckpointedCFHeaders catches a filter header store up with the
// checkpoints we got from the network. It assumes that the filter header store
// matches the checkpoints up to the tip of the store.
func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
store *headerfs.FilterHeaderStore, fType wire.FilterType) {
// We keep going until we've caught up the filter header store with the
// latest known checkpoint.
curHeader, curHeight, err := store.ChainTip()
if err != nil {
panic(fmt.Sprintf("failed getting chaintip from filter "+
"store: %v", err))
}
initialFilterHeader := curHeader
log.Infof("Fetching set of checkpointed cfheaders filters from "+
"height=%v, hash=%v", curHeight, curHeader)
// The starting interval is the checkpoint index that we'll be starting
// from based on our current height in the filter header index.
startingInterval := curHeight / wire.CFCheckptInterval
log.Infof("Starting to query for cfheaders from "+
"checkpoint_interval=%v, checkpoints=%v", startingInterval,
len(checkpoints))
// We'll determine how many queries we'll make based on our starting
// interval and our set of checkpoints. Each query will attempt to fetch
// maxCFCheckptsPerQuery intervals worth of filter headers. If
// maxCFCheckptsPerQuery is not a factor of the number of checkpoint
// intervals to fetch, then an additional query will exist that spans
// the remaining checkpoint intervals.
numCheckpts := uint32(len(checkpoints)) - startingInterval
numQueries := (numCheckpts + maxCFCheckptsPerQuery - 1) / maxCFCheckptsPerQuery
queryMsgs := make([]wire.Message, 0, numQueries)
// We'll also create an additional set of maps that we'll use to
// re-order the responses as we get them in.
queryResponses := make(map[uint32]*wire.MsgCFHeaders, numQueries)
stopHashes := make(map[chainhash.Hash]uint32, numQueries)
// Generate all of the requests we'll be batching and space to store
// the responses. Also make a map of stophash to index to make it
// easier to match against incoming responses.
//
// TODO(roasbeef): extract to func to test
currentInterval := startingInterval
for currentInterval < uint32(len(checkpoints)) {
// Each checkpoint is spaced wire.CFCheckptInterval after the
// prior one, so we'll fetch headers in batches using the
// checkpoints as a guide. Our queries will consist of
// maxCFCheckptsPerQuery unless we don't have enough checkpoints
// to do so. In that case, our query will consist of whatever is
// left.
startHeightRange :=
(currentInterval * wire.CFCheckptInterval) + 1
nextInterval := currentInterval + maxCFCheckptsPerQuery
if nextInterval > uint32(len(checkpoints)) {
nextInterval = uint32(len(checkpoints))
}
endHeightRange := nextInterval * wire.CFCheckptInterval
log.Tracef("Checkpointed cfheaders request start_range=%v, "+
"end_range=%v", startHeightRange, endHeightRange)
// In order to fetch the range, we'll need the block header for
// the end of the height range.
stopHeader, err := b.cfg.BlockHeaders.FetchHeaderByHeight(
endHeightRange,
)
if err != nil {
panic(fmt.Sprintf("failed getting block header at "+