Skip to content

Commit

Permalink
fix(relayer): restart indexer and crawler when they exit due to faile…
Browse files Browse the repository at this point in the history
…d API requests (#16465)
  • Loading branch information
xiaodino authored Mar 18, 2024
1 parent f416ffb commit eb84395
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
35 changes: 24 additions & 11 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,38 @@ func (i *Indexer) Start() error {
i.wg.Add(1)

go func() {
defer func() {
i.wg.Done()
}()

if err := i.filter(i.ctx); err != nil {
slog.Error("error filtering blocks", "error", err.Error())
if err := backoff.Retry(func() error {
err := i.filter(i.ctx)
if err != nil {
slog.Error("filter failed, will retry", "error", err)
}
return err
}, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil {
slog.Error("error after retrying filter with backoff", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
err := scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
if err != nil {
slog.Error("scanBlocks failed, will retry", "error", err)
}
return err
}, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return i.queue.Notify(i.ctx, i.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("queue notify backoff retry", "error", err)
err := i.queue.Notify(i.ctx, i.wg)
if err != nil {
slog.Error("i.queue.Notify failed, will retry", "error", err)
}
return err
}, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil {
slog.Error("i.queue.Notify backoff retry", "error", err)
}
}()

Expand Down Expand Up @@ -383,6 +393,9 @@ func (i *Indexer) filter(ctx context.Context) error {

if i.watchMode == CrawlPastBlocks {
slog.Info("restarting filtering from genesis")

i.processingBlockHeight = 0

return i.filter(ctx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func (i *Indexer) setInitialProcessingBlockByMode(

return nil
case Resync:
if i.watchMode == CrawlPastBlocks && i.processingBlockHeight > startingBlock {
return nil
}

i.processingBlockHeight = startingBlock

return nil
default:
return relayer.ErrInvalidMode
Expand Down

0 comments on commit eb84395

Please sign in to comment.