Skip to content

Commit e62e4d2

Browse files
network: integrate state sync module with blockfetcher
Close #3574 Signed-off-by: Ekaterina Pavlova <[email protected]> fix Signed-off-by: Ekaterina Pavlova <[email protected]>
1 parent 4af6927 commit e62e4d2

File tree

11 files changed

+234
-34
lines changed

11 files changed

+234
-34
lines changed

Diff for: internal/fakechain/fakechain.go

+12
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,8 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
452452
// NeedHeaders implements the StateSync interface.
453453
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }
454454

455+
func (s *FakeStateSync) NeedBlocks() bool { return false }
456+
455457
// NeedMPTNodes implements the StateSync interface.
456458
func (s *FakeStateSync) NeedMPTNodes() bool {
457459
panic("TODO")
@@ -469,3 +471,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
469471
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
470472
panic("TODO")
471473
}
474+
475+
// GetConfig implements the StateSync interface.
476+
func (s *FakeStateSync) GetConfig() config.Blockchain {
477+
panic("TODO")
478+
}
479+
480+
// SetOnStageChanged implements the StateSync interface.
481+
func (s *FakeStateSync) SetOnStageChanged(func()) {
482+
panic("TODO")
483+
}

Diff for: pkg/config/blockfetcher_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
2121
BQueueSize int `yaml:"BQueueSize"`
2222
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
2323
IndexFileSize uint32 `yaml:"IndexFileSize"`
24+
BlocksOnly bool `yaml:"BlocksOnly"`
2425
}
2526

2627
// Validate checks NeoFSBlockFetcher for internal consistency and ensures

Diff for: pkg/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
6464
return Blockchain{
6565
ProtocolConfiguration: c.ProtocolConfiguration,
6666
Ledger: c.ApplicationConfiguration.Ledger,
67+
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
6768
}
6869
}
6970

Diff for: pkg/config/ledger_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ type Ledger struct {
2929
type Blockchain struct {
3030
ProtocolConfiguration
3131
Ledger
32+
NeoFSBlockFetcher
3233
}

Diff for: pkg/core/blockchain.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,9 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
285285
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
286286
}
287287
if cfg.P2PStateExchangeExtensions {
288-
if !cfg.StateRootInHeader {
289-
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
290-
}
288+
//if !cfg.StateRootInHeader && !cfg.NeoFSBlockFetcher.Enabled {
289+
// return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
290+
//}
291291
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
292292
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
293293
}

Diff for: pkg/core/statesync/module.go

+73-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ import (
3737
// stateSyncStage is a type of state synchronisation stage.
3838
type stateSyncStage uint8
3939

40+
// OnStageChanged is a callback function that is called when state sync stage is changed.
41+
type OnStageChanged func()
42+
4043
const (
4144
// inactive means that state exchange is disabled by the protocol configuration.
4245
// Can't be combined with other states.
@@ -93,6 +96,10 @@ type Module struct {
9396
billet *mpt.Billet
9497

9598
jumpCallback func(p uint32) error
99+
100+
// stageCB is an optional callback that is triggered whenever
101+
// the sync stage changes.
102+
stageCB func()
96103
}
97104

98105
// NewModule returns new instance of statesync module.
@@ -120,6 +127,7 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
120127
// Init initializes state sync module for the current chain's height with given
121128
// callback for MPT nodes requests.
122129
func (s *Module) Init(currChainHeight uint32) error {
130+
oldStage := s.syncStage
123131
s.lock.Lock()
124132
defer s.lock.Unlock()
125133

@@ -131,6 +139,9 @@ func (s *Module) Init(currChainHeight uint32) error {
131139
if p < 2*s.syncInterval {
132140
// chain is too low to start state exchange process, use the standard sync mechanism
133141
s.syncStage = inactive
142+
if s.syncStage != oldStage {
143+
s.notifyStageChanged()
144+
}
134145
return nil
135146
}
136147
pOld, err := s.dao.GetStateSyncPoint()
@@ -142,6 +153,9 @@ func (s *Module) Init(currChainHeight uint32) error {
142153
// chain has already been synchronised up to old state sync point and regular blocks processing was started.
143154
// Current block height is enough to start regular blocks processing.
144155
s.syncStage = inactive
156+
if s.syncStage != oldStage {
157+
s.notifyStageChanged()
158+
}
145159
return nil
146160
}
147161
if err == nil {
@@ -172,10 +186,26 @@ func (s *Module) Init(currChainHeight uint32) error {
172186
s.log.Info("try to sync state for the latest state synchronisation point",
173187
zap.Uint32("point", p),
174188
zap.Uint32("evaluated chain's blockHeight", currChainHeight))
175-
189+
if s.syncStage != oldStage {
190+
s.notifyStageChanged()
191+
}
176192
return s.defineSyncStage()
177193
}
178194

195+
// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
196+
func (s *Module) SetOnStageChanged(cb func()) {
197+
s.lock.Lock()
198+
defer s.lock.Unlock()
199+
s.stageCB = cb
200+
}
201+
202+
// notifyStageChanged triggers stage callback if it's set.
203+
func (s *Module) notifyStageChanged() {
204+
if s.stageCB != nil {
205+
go s.stageCB()
206+
}
207+
}
208+
179209
// TemporaryPrefix accepts current storage prefix and returns prefix
180210
// to use for storing intermediate items during synchronization.
181211
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
@@ -192,12 +222,17 @@ func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
192222
// defineSyncStage sequentially checks and sets sync state process stage after Module
193223
// initialization. It also performs initialization of MPT Billet if necessary.
194224
func (s *Module) defineSyncStage() error {
225+
oldStage := s.syncStage
195226
// check headers sync stage first
196227
ltstHeaderHeight := s.bc.HeaderHeight()
197228
if ltstHeaderHeight > s.syncPoint {
198229
s.syncStage = headersSynced
199230
s.log.Info("headers are in sync",
200231
zap.Uint32("headerHeight", s.bc.HeaderHeight()))
232+
if s.syncStage != oldStage {
233+
s.notifyStageChanged()
234+
oldStage = s.syncStage
235+
}
201236
}
202237

203238
// check blocks sync stage
@@ -206,13 +241,21 @@ func (s *Module) defineSyncStage() error {
206241
s.syncStage |= blocksSynced
207242
s.log.Info("blocks are in sync",
208243
zap.Uint32("blockHeight", s.blockHeight))
244+
if s.syncStage != oldStage {
245+
s.notifyStageChanged()
246+
oldStage = s.syncStage
247+
}
209248
}
210249

211250
// check MPT sync stage
212251
if s.blockHeight > s.syncPoint {
213252
s.syncStage |= mptSynced
214253
s.log.Info("MPT is in sync",
215254
zap.Uint32("stateroot height", s.stateMod.CurrentLocalHeight()))
255+
if s.syncStage != oldStage {
256+
s.notifyStageChanged()
257+
oldStage = s.syncStage
258+
}
216259
} else if s.syncStage&headersSynced != 0 {
217260
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1))
218261
if err != nil {
@@ -255,12 +298,19 @@ func (s *Module) defineSyncStage() error {
255298
s.syncStage |= mptSynced
256299
s.log.Info("MPT is in sync",
257300
zap.Uint32("stateroot height", s.syncPoint))
301+
if s.syncStage != oldStage {
302+
s.notifyStageChanged()
303+
oldStage = s.syncStage
304+
}
258305
}
259306
}
260307

261308
if s.syncStage == headersSynced|blocksSynced|mptSynced {
262309
s.log.Info("state is in sync, starting regular blocks processing")
263310
s.syncStage = inactive
311+
if s.syncStage != oldStage {
312+
s.notifyStageChanged()
313+
}
264314
}
265315
return nil
266316
}
@@ -306,6 +356,7 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {
306356

307357
// AddBlock verifies and saves block skipping executable scripts.
308358
func (s *Module) AddBlock(block *block.Block) error {
359+
oldStage := s.syncStage
309360
s.lock.Lock()
310361
defer s.lock.Unlock()
311362

@@ -351,6 +402,9 @@ func (s *Module) AddBlock(block *block.Block) error {
351402
s.syncStage |= blocksSynced
352403
s.log.Info("blocks are in sync",
353404
zap.Uint32("blockHeight", s.blockHeight))
405+
if s.syncStage != oldStage {
406+
s.notifyStageChanged()
407+
}
354408
s.checkSyncIsCompleted()
355409
}
356410
return nil
@@ -425,6 +479,7 @@ func (s *Module) restoreNode(n mpt.Node) error {
425479
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
426480
// should take care of it.
427481
func (s *Module) checkSyncIsCompleted() {
482+
oldStage := s.syncStage
428483
if s.syncStage != headersSynced|mptSynced|blocksSynced {
429484
return
430485
}
@@ -436,6 +491,10 @@ func (s *Module) checkSyncIsCompleted() {
436491
}
437492
s.syncStage = inactive
438493
s.dispose()
494+
495+
if s.syncStage != oldStage {
496+
s.notifyStageChanged()
497+
}
439498
}
440499

441500
func (s *Module) dispose() {
@@ -492,6 +551,14 @@ func (s *Module) NeedMPTNodes() bool {
492551
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
493552
}
494553

554+
// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
555+
func (s *Module) NeedBlocks() bool {
556+
s.lock.RLock()
557+
defer s.lock.RUnlock()
558+
559+
return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0
560+
}
561+
495562
// Traverse traverses local MPT nodes starting from the specified root down to its
496563
// children calling `process` for each serialised node until stop condition is satisfied.
497564
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
@@ -516,3 +583,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
516583

517584
return s.mptpool.GetBatch(limit)
518585
}
586+
587+
// GetConfig returns current blockchain configuration.
588+
func (s *Module) GetConfig() config.Blockchain {
589+
return s.bc.GetConfig()
590+
}

Diff for: pkg/network/bqueue/queue.go

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66
"time"
77

8+
"github.com/nspcc-dev/neo-go/pkg/config"
89
"github.com/nspcc-dev/neo-go/pkg/core/block"
910
"go.uber.org/zap"
1011
)
@@ -15,6 +16,7 @@ type Blockqueuer interface {
1516
AddHeaders(...*block.Header) error
1617
BlockHeight() uint32
1718
HeaderHeight() uint32
19+
GetConfig() config.Blockchain
1820
}
1921

2022
// OperationMode is the mode of operation for the block queue.

0 commit comments

Comments
 (0)