Skip to content

Commit

Permalink
feat(relayer): queue fix (#16260)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Mar 3, 2024
1 parent ae26d8a commit f416ffb
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 31 deletions.
6 changes: 0 additions & 6 deletions packages/relayer/indexer/detect_and_handle_reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ import (
)

func (i *Indexer) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error {
// dont check on crawling past blocks, it will be a secondary indexer.
// we expect to see duplicates in this mode.
if i.watchMode == CrawlPastBlocks {
return nil
}

e, err := i.eventRepo.FirstByEventAndMsgHash(ctx, eventType, msgHash)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FirstByMsgHash")
Expand Down
14 changes: 9 additions & 5 deletions packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (i *Indexer) handleEvent(
return nil
}

// check if we have seen this event and msgHash before - if we have, it is being reorged.
if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

// we should never see an empty msgHash, but if we do, we dont process.
if event.MsgHash == relayer.ZeroHash {
slog.Warn("Zero msgHash found. This is unexpected. Returning early")
Expand All @@ -61,6 +56,11 @@ func (i *Indexer) handleEvent(
// only wait for confirmations when not crawling past blocks.
// these are guaranteed to be confirmed since the blocks are old.
if i.watchMode != CrawlPastBlocks {
// check if we have seen this event and msgHash before - if we have, it is being reorged.
if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

// we need to wait for confirmations to confirm this event is not being reverted,
// removed, or reorged now.
confCtx, confCtxCancel := context.WithTimeout(ctx, defaultCtxTimeout)
Expand Down Expand Up @@ -141,11 +141,15 @@ func (i *Indexer) handleEvent(
if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status {
// If the status from contract matches the existing event status,
// we can return early as this message has been processed as expected.
slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status)

return nil
}

// If the status from contract is done, update the database
if i.watchMode == CrawlPastBlocks && eventStatus == relayer.EventStatusDone {
slog.Info("updating status for msgHash", "msgHash", common.Hash(event.MsgHash).Hex())

if err := i.eventRepo.UpdateStatus(ctx, id, relayer.EventStatusDone); err != nil {
return errors.Wrap(err, fmt.Sprintf("i.eventRepo.UpdateStatus, id: %v", id))
}
Expand Down
44 changes: 26 additions & 18 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,20 @@ func (i *Indexer) filter(ctx context.Context) error {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

if !messageSentEvents.Next() || messageSentEvents.Event == nil {
// use "end" not "filterEnd" here, because it will be used as the start
// of the next batch.
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
}

continue
}

group, groupCtx := errgroup.WithContext(ctx)
group.SetLimit(i.numGoroutines)

for messageSentEvents.Next() {
for {
event := messageSentEvents.Event

group.Go(func() error {
Expand All @@ -349,17 +359,21 @@ func (i *Indexer) filter(ctx context.Context) error {

return nil
})
}

// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
}
// if there are no more events
if !messageSentEvents.Next() {
// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
}
// handle no events remaining, saving the processing block and restarting the for
// loop
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
}

// handle no events remaining, saving the processing block and restarting the for
// loop
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
break
}
}
}

Expand All @@ -379,16 +393,10 @@ func (i *Indexer) filter(ctx context.Context) error {
return errors.Wrap(err, "i.srcEthClient.HeaderByNumber")
}

latestBlockIDToCompare := latestBlock.Number.Uint64()

if i.watchMode == CrawlPastBlocks && latestBlockIDToCompare > i.numLatestBlocksToIgnoreWhenCrawling {
latestBlockIDToCompare -= i.numLatestBlocksToIgnoreWhenCrawling
}

if i.processingBlockHeight < latestBlockIDToCompare {
if i.processingBlockHeight < latestBlock.Number.Uint64() {
slog.Info("header has advanced",
"processingBlockHeight", i.processingBlockHeight,
"latestBlock", latestBlockIDToCompare,
"latestBlock", latestBlock.Number.Uint64(),
)

return i.filter(ctx)
Expand Down
16 changes: 14 additions & 2 deletions packages/relayer/pkg/queue/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ func (r *RabbitMQ) Ack(ctx context.Context, msg queue.Message) error {

err := rmqMsg.Ack(false)

slog.Info("attempted acknowledge rabbitmq message")

if err != nil {
slog.Error("error acknowledging rabbitmq message", "err", err.Error())
return err
Expand Down Expand Up @@ -214,6 +212,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close connection")
}

r.Close(ctx)

if err := r.connect(); err != nil {
slog.Error("error connecting to rabbitmq", "err", err.Error())
return err
}

return queue.ErrClosed
case err := <-r.chErrCh:
if err != nil {
Expand All @@ -222,6 +227,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close channel")
}

r.Close(ctx)

if err := r.connect(); err != nil {
slog.Error("error connecting to rabbitmq", "err", err.Error())
return err
}

return queue.ErrClosed
case returnMsg := <-r.notifyReturnCh:
slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText)
Expand Down

0 comments on commit f416ffb

Please sign in to comment.