Skip to content

Commit 8d728b4

Browse files
authored
Merge pull request #3789 from nspcc-dev/headers
Headers fetching via NeoFS BlockFetcher service
2 parents 3fa02d0 + 1ad62ef commit 8d728b4

17 files changed

+553
-137
lines changed

internal/fakechain/fakechain.go

+18
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ type FakeStateSync struct {
4949
AddMPTNodesFunc func(nodes [][]byte) error
5050
}
5151

52+
// HeaderHeight returns the height of the latest stored header.
53+
func (s *FakeStateSync) HeaderHeight() uint32 {
54+
return 0
55+
}
56+
5257
// NewFakeChain returns a new FakeChain structure.
5358
func NewFakeChain() *FakeChain {
5459
return NewFakeChainWithCustomCfg(nil)
@@ -447,6 +452,9 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
447452
// NeedHeaders implements the StateSync interface.
448453
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }
449454

455+
// NeedBlocks implements the StateSync interface.
456+
func (s *FakeStateSync) NeedBlocks() bool { return false }
457+
450458
// NeedMPTNodes implements the StateSync interface.
451459
func (s *FakeStateSync) NeedMPTNodes() bool {
452460
panic("TODO")
@@ -464,3 +472,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
464472
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
465473
panic("TODO")
466474
}
475+
476+
// GetConfig implements the StateSync interface.
477+
func (s *FakeStateSync) GetConfig() config.Blockchain {
478+
panic("TODO")
479+
}
480+
481+
// SetOnStageChanged implements the StateSync interface.
482+
func (s *FakeStateSync) SetOnStageChanged(func()) {
483+
panic("TODO")
484+
}

pkg/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
6464
return Blockchain{
6565
ProtocolConfiguration: c.ProtocolConfiguration,
6666
Ledger: c.ApplicationConfiguration.Ledger,
67+
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
6768
}
6869
}
6970

pkg/config/ledger_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ type Ledger struct {
3131
type Blockchain struct {
3232
ProtocolConfiguration
3333
Ledger
34+
NeoFSBlockFetcher
3435
}

pkg/config/protocol_config.go

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ type (
4949
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
5050
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
5151
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
52+
// NeoFSStateSyncExtensions enables state data exchange logic via NeoFS.
53+
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
5254
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
5355
ReservedAttributes bool `yaml:"ReservedAttributes"`
5456

pkg/consensus/consensus.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type Ledger interface {
5858

5959
// BlockQueuer is an interface to the block queue manager sufficient for Service.
6060
type BlockQueuer interface {
61-
PutBlock(block *coreb.Block) error
61+
Put(queueable *coreb.Block) error
6262
}
6363

6464
// Service represents a consensus instance.
@@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error {
623623
bb := &b.(*neoBlock).Block
624624
bb.Script = *(s.getBlockWitness(bb))
625625

626-
if err := s.BlockQueue.PutBlock(bb); err != nil {
626+
if err := s.BlockQueue.Put(bb); err != nil {
627627
// The block might already be added via the regular network
628628
// interaction.
629629
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {

pkg/consensus/consensus_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ type testBlockQueuer struct {
528528
var _ = BlockQueuer(testBlockQueuer{})
529529

530530
// PutBlock implements BlockQueuer interface.
531-
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
531+
func (bq testBlockQueuer) Put(b *coreb.Block) error {
532532
return bq.bc.AddBlock(b)
533533
}
534534

pkg/core/block/block.go

+7
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ type auxBlockIn struct {
5151
Transactions []json.RawMessage `json:"tx"`
5252
}
5353

54+
// GetIndex returns the index of the block. This method should be used
55+
// for interfaces only. As generics don't support structural types
56+
// ref. golang/go#51259.
57+
func (b *Block) GetIndex() uint32 {
58+
return b.Index
59+
}
60+
5461
// ComputeMerkleRoot computes Merkle tree root hash based on actual block's data.
5562
func (b *Block) ComputeMerkleRoot() util.Uint256 {
5663
hashes := make([]util.Uint256, len(b.Transactions))

pkg/core/block/header.go

+28
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
1111
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
1212
"github.com/nspcc-dev/neo-go/pkg/io"
13+
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
1314
"github.com/nspcc-dev/neo-go/pkg/util"
1415
)
1516

@@ -80,6 +81,13 @@ type baseAux struct {
8081
Witnesses []transaction.Witness `json:"witnesses"`
8182
}
8283

84+
// GetIndex returns the index of the block. This method should be used
85+
// for interfaces only. As generics don't support structural types
86+
// ref. golang/go#51259.
87+
func (b *Header) GetIndex() uint32 {
88+
return b.Index
89+
}
90+
8391
// Hash returns the hash of the block. Notice that it is cached internally,
8492
// so no matter how you change the [Header] after the first invocation of this
8593
// method it won't change. To get an updated hash in case you're changing
@@ -228,3 +236,23 @@ func (b *Header) UnmarshalJSON(data []byte) error {
228236
}
229237
return nil
230238
}
239+
240+
// GetExpectedHeaderSize returns the expected Header size with the given number of validators.
241+
func GetExpectedHeaderSize(stateRootInHeader bool, numOfValidators int) int {
242+
m := smartcontract.GetDefaultHonestNodeCount(numOfValidators)
243+
// expectedHeaderSizeWithEmptyWitness contains 2 bytes for zero-length (new(Header)).Script.Invocation/Verification
244+
// InvocationScript:
245+
// 64 is the size of the default signature length + 2 bytes length and opcode
246+
// 2 = 1 push opcode + 1 length
247+
// VerifcationScript:
248+
// m = 1 bytes
249+
// 33 = 1 push opcode + 1 length + 33 bytes for public key
250+
// n = 1 bytes
251+
// 5 for SYSCALL
252+
size := expectedHeaderSizeWithEmptyWitness + (1+1+64)*m + 2 + numOfValidators*(1+1+33) + 2 + 5
253+
254+
if stateRootInHeader {
255+
size += util.Uint256Size
256+
}
257+
return size
258+
}

pkg/core/blockchain.go

+13
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,9 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
299299
log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value",
300300
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
301301
}
302+
if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions {
303+
return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously")
304+
}
302305
if cfg.P2PStateExchangeExtensions {
303306
if !cfg.StateRootInHeader {
304307
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
@@ -312,6 +315,16 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
312315
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
313316
}
314317
}
318+
if cfg.NeoFSStateSyncExtensions {
319+
if !cfg.NeoFSBlockFetcher.Enabled {
320+
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
321+
}
322+
if cfg.StateSyncInterval <= 0 {
323+
cfg.StateSyncInterval = defaultStateSyncInterval
324+
log.Info("StateSyncInterval is not set or wrong, using default value",
325+
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
326+
}
327+
}
315328
if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks {
316329
return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
317330
}

pkg/core/statesync/module.go

+67-1
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,15 @@ type Module struct {
9393
billet *mpt.Billet
9494

9595
jumpCallback func(p uint32) error
96+
97+
// stageChangedCallback is an optional callback that is triggered whenever
98+
// the sync stage changes.
99+
stageChangedCallback func()
96100
}
97101

98102
// NewModule returns new instance of statesync module.
99103
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
100-
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
104+
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) && !bc.GetConfig().NeoFSStateSyncExtensions {
101105
return &Module{
102106
dao: s,
103107
bc: bc,
@@ -120,7 +124,13 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
120124
// Init initializes state sync module for the current chain's height with given
121125
// callback for MPT nodes requests.
122126
func (s *Module) Init(currChainHeight uint32) error {
127+
oldStage := s.syncStage
123128
s.lock.Lock()
129+
defer func() {
130+
if s.syncStage != oldStage {
131+
s.notifyStageChanged()
132+
}
133+
}()
124134
defer s.lock.Unlock()
125135

126136
if s.syncStage != none {
@@ -176,6 +186,20 @@ func (s *Module) Init(currChainHeight uint32) error {
176186
return s.defineSyncStage()
177187
}
178188

189+
// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
190+
func (s *Module) SetOnStageChanged(cb func()) {
191+
s.lock.Lock()
192+
defer s.lock.Unlock()
193+
s.stageChangedCallback = cb
194+
}
195+
196+
// notifyStageChanged triggers stage change callback if it's set.
197+
func (s *Module) notifyStageChanged() {
198+
if s.stageChangedCallback != nil {
199+
s.stageChangedCallback()
200+
}
201+
}
202+
179203
// TemporaryPrefix accepts current storage prefix and returns prefix
180204
// to use for storing intermediate items during synchronization.
181205
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
@@ -287,7 +311,13 @@ func (s *Module) getLatestSavedBlock(p uint32) uint32 {
287311

288312
// AddHeaders validates and adds specified headers to the chain.
289313
func (s *Module) AddHeaders(hdrs ...*block.Header) error {
314+
oldStage := s.syncStage
290315
s.lock.Lock()
316+
defer func() {
317+
if s.syncStage != oldStage {
318+
s.notifyStageChanged()
319+
}
320+
}()
291321
defer s.lock.Unlock()
292322

293323
if s.syncStage != initialized {
@@ -306,7 +336,13 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {
306336

307337
// AddBlock verifies and saves block skipping executable scripts.
308338
func (s *Module) AddBlock(block *block.Block) error {
339+
oldStage := s.syncStage
309340
s.lock.Lock()
341+
defer func() {
342+
if s.syncStage != oldStage {
343+
s.notifyStageChanged()
344+
}
345+
}()
310346
defer s.lock.Unlock()
311347

312348
if s.syncStage&headersSynced == 0 || s.syncStage&blocksSynced != 0 {
@@ -359,7 +395,13 @@ func (s *Module) AddBlock(block *block.Block) error {
359395
// AddMPTNodes tries to add provided set of MPT nodes to the MPT billet if they are
360396
// not yet collected.
361397
func (s *Module) AddMPTNodes(nodes [][]byte) error {
398+
oldStage := s.syncStage
362399
s.lock.Lock()
400+
defer func() {
401+
if s.syncStage != oldStage {
402+
s.notifyStageChanged()
403+
}
404+
}()
363405
defer s.lock.Unlock()
364406

365407
if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
@@ -425,6 +467,12 @@ func (s *Module) restoreNode(n mpt.Node) error {
425467
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
426468
// should take care of it.
427469
func (s *Module) checkSyncIsCompleted() {
470+
oldStage := s.syncStage
471+
defer func() {
472+
if s.syncStage != oldStage {
473+
s.notifyStageChanged()
474+
}
475+
}()
428476
if s.syncStage != headersSynced|mptSynced|blocksSynced {
429477
return
430478
}
@@ -484,6 +532,14 @@ func (s *Module) NeedMPTNodes() bool {
484532
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
485533
}
486534

535+
// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
536+
func (s *Module) NeedBlocks() bool {
537+
s.lock.RLock()
538+
defer s.lock.RUnlock()
539+
540+
return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0
541+
}
542+
487543
// Traverse traverses local MPT nodes starting from the specified root down to its
488544
// children calling `process` for each serialised node until stop condition is satisfied.
489545
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
@@ -508,3 +564,13 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
508564

509565
return s.mptpool.GetBatch(limit)
510566
}
567+
568+
// HeaderHeight returns the height of the latest stored header.
569+
func (s *Module) HeaderHeight() uint32 {
570+
return s.bc.HeaderHeight()
571+
}
572+
573+
// GetConfig returns current blockchain configuration.
574+
func (s *Module) GetConfig() config.Blockchain {
575+
return s.bc.GetConfig()
576+
}

0 commit comments

Comments
 (0)