Skip to content

Commit b2c9187

Browse files
network: integrate state sync module with blockfetcher
Close #3574 Signed-off-by: Ekaterina Pavlova <[email protected]>
1 parent 4af6927 commit b2c9187

File tree

11 files changed

+115
-36
lines changed

11 files changed

+115
-36
lines changed

internal/fakechain/fakechain.go

+5
Original file line numberDiff line numberDiff line change
@@ -469,3 +469,8 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
469469
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
470470
panic("TODO")
471471
}
472+
473+
// GetConfig implements the StateSync interface.
474+
func (s *FakeStateSync) GetConfig() config.Blockchain {
475+
panic("TODO")
476+
}

pkg/config/blockfetcher_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
2121
BQueueSize int `yaml:"BQueueSize"`
2222
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
2323
IndexFileSize uint32 `yaml:"IndexFileSize"`
24+
BlocksOnly bool `yaml:"BlocksOnly"`
2425
}
2526

2627
// Validate checks NeoFSBlockFetcher for internal consistency and ensures

pkg/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
6464
return Blockchain{
6565
ProtocolConfiguration: c.ProtocolConfiguration,
6666
Ledger: c.ApplicationConfiguration.Ledger,
67+
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
6768
}
6869
}
6970

pkg/config/ledger_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ type Ledger struct {
2929
type Blockchain struct {
3030
ProtocolConfiguration
3131
Ledger
32+
NeoFSBlockFetcher
3233
}

pkg/core/blockchain.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
285285
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
286286
}
287287
if cfg.P2PStateExchangeExtensions {
288-
if !cfg.StateRootInHeader {
288+
if !cfg.StateRootInHeader && !cfg.NeoFSBlockFetcher.Enabled {
289289
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
290290
}
291291
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {

pkg/core/statesync/module.go

+5
Original file line numberDiff line numberDiff line change
@@ -516,3 +516,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
516516

517517
return s.mptpool.GetBatch(limit)
518518
}
519+
520+
// GetConfig returns current blockchain configuration.
521+
func (s *Module) GetConfig() config.Blockchain {
522+
return s.bc.GetConfig()
523+
}

pkg/network/bqueue/queue.go

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66
"time"
77

8+
"github.com/nspcc-dev/neo-go/pkg/config"
89
"github.com/nspcc-dev/neo-go/pkg/core/block"
910
"go.uber.org/zap"
1011
)
@@ -15,6 +16,7 @@ type Blockqueuer interface {
1516
AddHeaders(...*block.Header) error
1617
BlockHeight() uint32
1718
HeaderHeight() uint32
19+
GetConfig() config.Blockchain
1820
}
1921

2022
// OperationMode is the mode of operation for the block queue.

pkg/network/server.go

+52-21
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ type (
109109
notaryRequestPool *mempool.Pool
110110
extensiblePool *extpool.Pool
111111
notaryFeer NotaryFeer
112+
headerFetcher *blockfetcher.Service
112113
blockFetcher *blockfetcher.Service
113114

114115
serviceLock sync.RWMutex
@@ -227,10 +228,23 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
227228
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
228229
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
229230
}
230-
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
231-
var err error
232-
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
233-
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
231+
var (
232+
err error
233+
bq bqueue.Blockqueuer = s.chain
234+
)
235+
if s.config.P2PStateExchangeExtensions {
236+
bq = s.stateSync
237+
}
238+
s.bFetcherQueue = bqueue.New(bq, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
239+
s.headerFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
240+
func() {
241+
s.log.Info("NeoFS BlockFetcher finished headers downloading")
242+
})
243+
if err != nil {
244+
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
245+
}
246+
s.NeoFSBlockFetcherCfg.BlocksOnly = true
247+
s.blockFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
234248
if err != nil {
235249
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
236250
}
@@ -311,12 +325,6 @@ func (s *Server) Start() {
311325
go s.bQueue.Run()
312326
go s.bSyncQueue.Run()
313327
go s.bFetcherQueue.Run()
314-
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
315-
err := s.blockFetcher.Start()
316-
if err != nil {
317-
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
318-
}
319-
}
320328
for _, tr := range s.transports {
321329
go tr.Accept()
322330
}
@@ -333,7 +341,12 @@ func (s *Server) Shutdown() {
333341
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
334342
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
335343
s.bFetcherQueue.Discard()
336-
s.blockFetcher.Shutdown()
344+
if s.headerFetcher.IsActive() {
345+
s.headerFetcher.Shutdown()
346+
}
347+
if s.blockFetcher.IsActive() {
348+
s.blockFetcher.Shutdown()
349+
}
337350
}
338351
for _, tr := range s.transports {
339352
tr.Close()
@@ -732,7 +745,7 @@ func (s *Server) IsInSync() bool {
732745
var peersNumber int
733746
var notHigher int
734747

735-
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
748+
if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
736749
return false
737750
}
738751

@@ -792,7 +805,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
792805

793806
// handleBlockCmd processes the block received from its peer.
794807
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
795-
if s.blockFetcher.IsActive() {
808+
if s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
796809
return nil
797810
}
798811
if s.stateSync.IsActive() {
@@ -815,15 +828,23 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
815828
}
816829

817830
func (s *Server) requestBlocksOrHeaders(p Peer) error {
818-
if s.blockFetcher.IsActive() {
819-
return nil
820-
}
821831
if s.stateSync.NeedHeaders() {
822-
if s.chain.HeaderHeight() < p.LastBlockIndex() {
832+
if s.chain.HeaderHeight() < p.LastBlockIndex() || s.NeoFSBlockFetcherCfg.Enabled {
823833
return s.requestHeaders(p)
824834
}
825835
return nil
826836
}
837+
if s.headerFetcher.IsActive() {
838+
s.headerFetcher.Shutdown()
839+
}
840+
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
841+
if ((s.stateSync.IsActive() && s.headerFetcher.IsShutdown()) || !s.stateSync.IsActive()) && !s.blockFetcher.IsShutdown() {
842+
if err := s.blockFetcher.Start(); err != nil {
843+
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
844+
}
845+
}
846+
}
847+
827848
var (
828849
bq bqueue.Blockqueuer = s.chain
829850
requestMPTNodes bool
@@ -847,6 +868,16 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {
847868

848869
// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
849870
func (s *Server) requestHeaders(p Peer) error {
871+
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
872+
if !s.headerFetcher.IsShutdown() {
873+
err := s.headerFetcher.Start()
874+
if err != nil {
875+
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
876+
} else {
877+
return err
878+
}
879+
}
880+
}
850881
pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
851882
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
852883
}
@@ -1136,7 +1167,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
11361167

11371168
// handleHeadersCmd processes headers payload.
11381169
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
1139-
if s.blockFetcher.IsActive() {
1170+
if s.headerFetcher.IsActive() {
11401171
return nil
11411172
}
11421173
return s.stateSync.AddHeaders(h.Hdrs...)
@@ -1335,6 +1366,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
13351366
// 2. Send requests for chunk in increasing order.
13361367
// 3. After all requests have been sent, request random height.
13371368
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
1369+
if s.blockFetcher.IsActive() {
1370+
return nil
1371+
}
13381372
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
13391373
lq, capLeft := s.bQueue.LastQueued()
13401374
if capLeft == 0 {
@@ -1468,9 +1502,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
14681502
}
14691503

14701504
func (s *Server) tryInitStateSync() {
1471-
if s.blockFetcher.IsActive() {
1472-
return
1473-
}
14741505
if !s.stateSync.IsActive() {
14751506
s.bSyncQueue.Discard()
14761507
return

pkg/network/state_sync.go

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package network
22

33
import (
4+
"github.com/nspcc-dev/neo-go/pkg/config"
45
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
56
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
67
"github.com/nspcc-dev/neo-go/pkg/util"
@@ -17,4 +18,5 @@ type StateSync interface {
1718
NeedHeaders() bool
1819
NeedMPTNodes() bool
1920
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
21+
GetConfig() config.Blockchain
2022
}

pkg/services/blockfetcher/blockfetcher.go

+31-9
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
type Ledger interface {
3737
GetConfig() config.Blockchain
3838
BlockHeight() uint32
39+
HeaderHeight() uint32
3940
}
4041

4142
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
@@ -52,15 +53,18 @@ func (p poolWrapper) Close() error {
5253
// Service is a service that fetches blocks from NeoFS.
5354
type Service struct {
5455
// isActive denotes whether the service is working or in the process of shutdown.
55-
isActive atomic.Bool
56+
isActive atomic.Bool
57+
// isShutdown denotes whether the service is shutdown after running.
58+
isShutdown atomic.Bool
5659
log *zap.Logger
5760
cfg config.NeoFSBlockFetcher
5861
stateRootInHeader bool
5962

60-
chain Ledger
61-
pool poolWrapper
62-
enqueueBlock func(*block.Block) error
63-
account *wallet.Account
63+
chain Ledger
64+
pool poolWrapper
65+
enqueueBlock func(*block.Block) error
66+
enqueueHeader func(*block.Header) error
67+
account *wallet.Account
6468

6569
oidsCh chan oid.ID
6670
// wg is a wait group for block downloaders.
@@ -81,7 +85,7 @@ type Service struct {
8185
}
8286

8387
// New creates a new BlockFetcher Service.
84-
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
88+
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) {
8589
var (
8690
account *wallet.Account
8791
err error
@@ -143,6 +147,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
143147
cfg: cfg,
144148

145149
enqueueBlock: putBlock,
150+
enqueueHeader: putHeader,
146151
account: account,
147152
stateRootInHeader: chain.GetConfig().StateRootInHeader,
148153
shutdownCallback: shutdownCallback,
@@ -162,7 +167,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
162167

163168
// Start runs the NeoFS BlockFetcher service.
164169
func (bfs *Service) Start() error {
165-
if !bfs.isActive.CompareAndSwap(false, true) {
170+
if !bfs.isActive.CompareAndSwap(false, true) || bfs.IsShutdown() {
166171
return nil
167172
}
168173
bfs.log.Info("starting NeoFS BlockFetcher service")
@@ -262,7 +267,11 @@ func (bfs *Service) blockDownloader() {
262267
case <-bfs.ctx.Done():
263268
return
264269
default:
265-
err = bfs.enqueueBlock(b)
270+
if !bfs.cfg.BlocksOnly {
271+
err = bfs.enqueueHeader(&b.Header)
272+
} else {
273+
err = bfs.enqueueBlock(b)
274+
}
266275
if err != nil {
267276
bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err))
268277
bfs.stopService(true)
@@ -275,6 +284,9 @@ func (bfs *Service) blockDownloader() {
275284
// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
276285
func (bfs *Service) fetchOIDsFromIndexFiles() error {
277286
h := bfs.chain.BlockHeight()
287+
if !bfs.cfg.BlocksOnly {
288+
h = bfs.chain.HeaderHeight()
289+
}
278290
startIndex := h / bfs.cfg.IndexFileSize
279291
skip := h % bfs.cfg.IndexFileSize
280292

@@ -425,7 +437,7 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
425437
// block OIDs search, cancels all in-progress downloading operations and waits
426438
// until all service routines finish their work.
427439
func (bfs *Service) Shutdown() {
428-
if !bfs.IsActive() {
440+
if !bfs.IsActive() || bfs.IsShutdown() {
429441
return
430442
}
431443
bfs.stopService(true)
@@ -444,13 +456,17 @@ func (bfs *Service) stopService(force bool) {
444456
// exiter is a routine that is listening to a quitting signal and manages graceful
445457
// Service shutdown process.
446458
func (bfs *Service) exiter() {
459+
if !bfs.isActive.Load() {
460+
return
461+
}
447462
// Closing signal may come from anyone, but only once.
448463
force := <-bfs.quit
449464
bfs.log.Info("shutting down NeoFS BlockFetcher service",
450465
zap.Bool("force", force),
451466
)
452467

453468
bfs.isActive.CompareAndSwap(true, false)
469+
bfs.isShutdown.CompareAndSwap(false, true)
454470
// Cansel all pending OIDs/blocks downloads in case if shutdown requested by user
455471
// or caused by downloading error.
456472
if force {
@@ -482,6 +498,12 @@ func (bfs *Service) IsActive() bool {
482498
return bfs.isActive.Load()
483499
}
484500

501+
// IsShutdown returns true if the NeoFS BlockFetcher service is completely shutdown.
502+
// The service can not be started again.
503+
func (bfs *Service) IsShutdown() bool {
504+
return bfs.isShutdown.Load()
505+
}
506+
485507
// retry function with exponential backoff.
486508
func (bfs *Service) retry(action func() error) error {
487509
var (

0 commit comments

Comments
 (0)