Skip to content

Commit 0997a77

Browse files
blockfetcher: add headerSizeMap according to CommitteeHistory
Signed-off-by: Ekaterina Pavlova <[email protected]>
1 parent b2e4c2c commit 0997a77

File tree

1 file changed

+55
-19
lines changed

1 file changed

+55
-19
lines changed

Diff for: pkg/services/blockfetcher/blockfetcher.go

+55-19
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (p poolWrapper) Close() error {
6565
return nil
6666
}
6767

68+
type indexedOID struct {
69+
Index int
70+
OID oid.ID
71+
}
72+
6873
// Service is a service that fetches blocks from NeoFS.
6974
type Service struct {
7075
// isActive denotes whether the service is working or in the process of shutdown.
@@ -75,15 +80,15 @@ type Service struct {
7580
operationMode OperationMode
7681

7782
stateRootInHeader bool
78-
// headerSize is the size of the header in bytes.
79-
headerSize int
83+
// headerSizeMap is a map of height to expected header size.
84+
headerSizeMap map[int]int
8085

8186
chain Ledger
8287
pool poolWrapper
8388
enqueue func(block bqueue.Queueable) error
8489
account *wallet.Account
8590

86-
oidsCh chan oid.ID
91+
oidsCh chan indexedOID
8792
// wg is a wait group for block downloaders.
8893
wg sync.WaitGroup
8994

@@ -101,7 +106,7 @@ type Service struct {
101106
shutdownCallback func()
102107

103108
// Depends on the OperationMode, the following functions are set to the appropriate functions.
104-
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
109+
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
105110
readFunc func(rc io.ReadCloser) (any, error)
106111
heightFunc func() uint32
107112
}
@@ -168,7 +173,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
168173
log: logger,
169174
cfg: cfg,
170175
operationMode: opt,
171-
headerSize: getHeaderSize(chain.GetConfig()),
176+
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),
172177

173178
enqueue: put,
174179
account: account,
@@ -184,13 +189,14 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
184189
// * first full block of OIDs is processing by Downloader
185190
// * second full block of OIDs is available to be fetched by Downloader immediately
186191
// * third half-filled block of OIDs is being collected by OIDsFetcher.
187-
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
192+
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
188193
}, nil
189194
}
190195

191-
func getHeaderSize(chain config.Blockchain) int {
192-
m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount))
196+
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
197+
headerSizeMap := make(map[int]int)
193198
vs, _ := keys.NewPublicKeysFromStrings(chain.StandbyCommittee)
199+
m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount))
194200
verification, _ := smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(0)])
195201
b := block.Header{
196202
StateRootEnabled: chain.StateRootInHeader,
@@ -200,7 +206,22 @@ func getHeaderSize(chain config.Blockchain) int {
200206
InvocationScript: make([]byte, 66*m),
201207
},
202208
}
203-
return b.GetExpectedHeaderSize()
209+
headerSizeMap[0] = b.GetExpectedHeaderSize()
210+
211+
for height := range chain.CommitteeHistory {
212+
m = smartcontract.GetDefaultHonestNodeCount(chain.GetNumOfCNs(height))
213+
verification, _ = smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(height)])
214+
b = block.Header{
215+
StateRootEnabled: chain.StateRootInHeader,
216+
Timestamp: 42,
217+
Script: transaction.Witness{
218+
VerificationScript: verification,
219+
InvocationScript: make([]byte, 66*m),
220+
},
221+
}
222+
headerSizeMap[int(height)] = b.GetExpectedHeaderSize()
223+
}
224+
return headerSizeMap
204225
}
205226

206227
// Start runs the NeoFS BlockFetcher service.
@@ -288,11 +309,13 @@ func (bfs *Service) oidDownloader() {
288309
func (bfs *Service) blockDownloader() {
289310
defer bfs.wg.Done()
290311

291-
for blkOid := range bfs.oidsCh {
312+
for indexedOid := range bfs.oidsCh {
313+
index := indexedOid.Index
314+
blkOid := indexedOid.OID
292315
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
293316
defer cancel()
294317

295-
rc, err := bfs.getFunc(ctx, blkOid.String())
318+
rc, err := bfs.getFunc(ctx, blkOid.String(), index)
296319
if err != nil {
297320
if isContextCanceledErr(err) {
298321
return
@@ -358,15 +381,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
358381

359382
blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
360383
defer blockCancel()
361-
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
384+
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)
362385
if err != nil {
363386
if isContextCanceledErr(err) {
364387
return nil
365388
}
366389
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
367390
}
368391

369-
err = bfs.streamBlockOIDs(oidsRC, int(skip))
392+
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))
370393
if err != nil {
371394
if isContextCanceledErr(err) {
372395
return nil
@@ -381,7 +404,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
381404
}
382405

383406
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
384-
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
407+
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {
385408
defer rc.Close()
386409
oidBytes := make([]byte, oid.Size)
387410
oidsProcessed := 0
@@ -408,7 +431,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
408431
select {
409432
case <-bfs.exiterToOIDDownloader:
410433
return nil
411-
case bfs.oidsCh <- oidBlock:
434+
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:
412435
}
413436

414437
oidsProcessed++
@@ -453,12 +476,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
453476
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
454477
return nil
455478
}
479+
index := int(startIndex)
456480
for _, oid := range blockOids {
457481
select {
458482
case <-bfs.exiterToOIDDownloader:
459483
return nil
460-
case bfs.oidsCh <- oid:
484+
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:
461485
}
486+
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.
462487
}
463488
startIndex += batchSize
464489
}
@@ -592,7 +617,7 @@ func (bfs *Service) retry(action func() error) error {
592617
return err
593618
}
594619

595-
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
620+
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {
596621
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
597622
if err != nil {
598623
return nil, err
@@ -605,8 +630,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
605630
return rc, err
606631
}
607632

608-
func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
609-
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
633+
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
634+
nearestHeight := 0
635+
for h := range bfs.headerSizeMap {
636+
if h <= height && h > nearestHeight {
637+
nearestHeight = h
638+
}
639+
if nearestHeight >= height {
640+
break
641+
}
642+
}
643+
644+
size := bfs.headerSizeMap[nearestHeight]
645+
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
610646
if err != nil {
611647
return nil, err
612648
}

0 commit comments

Comments
 (0)