Skip to content

Commit d94eb81

Browse files
blockfetcher: add headerSizeMap to GetRange headers accordingly
Signed-off-by: Ekaterina Pavlova <[email protected]>
1 parent ee1989f commit d94eb81

File tree

1 file changed

+43
-18
lines changed

1 file changed

+43
-18
lines changed

pkg/services/blockfetcher/blockfetcher.go

+43-18
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ func (p poolWrapper) Close() error {
6161
return nil
6262
}
6363

64+
type indexedOID struct {
65+
Index int
66+
OID oid.ID
67+
}
68+
6469
// Service is a service that fetches blocks from NeoFS.
6570
type Service struct {
6671
// isActive denotes whether the service is working or in the process of shutdown.
@@ -71,15 +76,15 @@ type Service struct {
7176
operationMode OperationMode
7277

7378
stateRootInHeader bool
74-
// headerSize is the size of the header in bytes.
75-
headerSize int
79+
// headerSizeMap is a map of height to expected header size.
80+
headerSizeMap map[int]int
7681

7782
chain Ledger
7883
pool poolWrapper
7984
enqueue func(obj any) error
8085
account *wallet.Account
8186

82-
oidsCh chan oid.ID
87+
oidsCh chan indexedOID
8388
// wg is a wait group for block downloaders.
8489
wg sync.WaitGroup
8590

@@ -97,7 +102,7 @@ type Service struct {
97102
shutdownCallback func()
98103

99104
// Depends on the OperationMode, the following functions are set to the appropriate functions.
100-
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
105+
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
101106
readFunc func(rc io.ReadCloser) (any, error)
102107
heightFunc func() uint32
103108
}
@@ -164,7 +169,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
164169
log: logger,
165170
cfg: cfg,
166171
operationMode: opt,
167-
headerSize: getHeaderSize(chain.GetConfig()),
172+
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),
168173

169174
enqueue: put,
170175
account: account,
@@ -180,12 +185,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
180185
// * first full block of OIDs is processing by Downloader
181186
// * second full block of OIDs is available to be fetched by Downloader immediately
182187
// * third half-filled block of OIDs is being collected by OIDsFetcher.
183-
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
188+
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
184189
}, nil
185190
}
186191

187-
func getHeaderSize(chain config.Blockchain) int {
188-
return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
192+
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
193+
headerSizeMap := make(map[int]int)
194+
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
195+
for height := range chain.CommitteeHistory {
196+
headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height))
197+
}
198+
return headerSizeMap
189199
}
190200

191201
// Start runs the NeoFS BlockFetcher service.
@@ -276,11 +286,13 @@ func (bfs *Service) oidDownloader() {
276286
func (bfs *Service) blockDownloader() {
277287
defer bfs.wg.Done()
278288

279-
for blkOid := range bfs.oidsCh {
289+
for indexedOid := range bfs.oidsCh {
290+
index := indexedOid.Index
291+
blkOid := indexedOid.OID
280292
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
281293
defer cancel()
282294

283-
rc, err := bfs.getFunc(ctx, blkOid.String())
295+
rc, err := bfs.getFunc(ctx, blkOid.String(), index)
284296
if err != nil {
285297
if isContextCanceledErr(err) {
286298
return
@@ -346,15 +358,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
346358

347359
blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
348360
defer blockCancel()
349-
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
361+
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)
350362
if err != nil {
351363
if isContextCanceledErr(err) {
352364
return nil
353365
}
354366
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
355367
}
356368

357-
err = bfs.streamBlockOIDs(oidsRC, int(skip))
369+
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))
358370
if err != nil {
359371
if isContextCanceledErr(err) {
360372
return nil
@@ -369,7 +381,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
369381
}
370382

371383
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
372-
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
384+
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {
373385
defer rc.Close()
374386
oidBytes := make([]byte, oid.Size)
375387
oidsProcessed := 0
@@ -396,7 +408,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
396408
select {
397409
case <-bfs.exiterToOIDDownloader:
398410
return nil
399-
case bfs.oidsCh <- oidBlock:
411+
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:
400412
}
401413

402414
oidsProcessed++
@@ -441,12 +453,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
441453
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
442454
return nil
443455
}
456+
index := int(startIndex)
444457
for _, oid := range blockOids {
445458
select {
446459
case <-bfs.exiterToOIDDownloader:
447460
return nil
448-
case bfs.oidsCh <- oid:
461+
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:
449462
}
463+
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.
450464
}
451465
startIndex += batchSize
452466
}
@@ -580,7 +594,7 @@ func (bfs *Service) retry(action func() error) error {
580594
return err
581595
}
582596

583-
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
597+
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {
584598
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
585599
if err != nil {
586600
return nil, err
@@ -593,8 +607,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
593607
return rc, err
594608
}
595609

596-
func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
597-
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
610+
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
611+
nearestHeight := 0
612+
for h := range bfs.headerSizeMap {
613+
if h <= height && h > nearestHeight {
614+
nearestHeight = h
615+
}
616+
if nearestHeight >= height {
617+
break
618+
}
619+
}
620+
621+
size := bfs.headerSizeMap[nearestHeight]
622+
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
598623
if err != nil {
599624
return nil, err
600625
}

0 commit comments

Comments
 (0)