Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): queue fix #16260

Merged
merged 7 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions packages/relayer/indexer/detect_and_handle_reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ import (
)

func (i *Indexer) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error {
// dont check on crawling past blocks, it will be a secondary indexer.
// we expect to see duplicates in this mode.
if i.watchMode == CrawlPastBlocks {
return nil
}

e, err := i.eventRepo.FirstByEventAndMsgHash(ctx, eventType, msgHash)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FirstByMsgHash")
Expand Down
14 changes: 9 additions & 5 deletions packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (i *Indexer) handleEvent(
return nil
}

// check if we have seen this event and msgHash before - if we have, it is being reorged.
if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

// we should never see an empty msgHash, but if we do, we dont process.
if event.MsgHash == relayer.ZeroHash {
slog.Warn("Zero msgHash found. This is unexpected. Returning early")
Expand All @@ -61,6 +56,11 @@ func (i *Indexer) handleEvent(
// only wait for confirmations when not crawling past blocks.
// these are guaranteed to be confirmed since the blocks are old.
if i.watchMode != CrawlPastBlocks {
// check if we have seen this event and msgHash before - if we have, it is being reorged.
if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil {
return errors.Wrap(err, "svc.detectAndHandleReorg")
}

// we need to wait for confirmations to confirm this event is not being reverted,
// removed, or reorged now.
confCtx, confCtxCancel := context.WithTimeout(ctx, defaultCtxTimeout)
Expand Down Expand Up @@ -141,11 +141,15 @@ func (i *Indexer) handleEvent(
if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status {
// If the status from contract matches the existing event status,
// we can return early as this message has been processed as expected.
slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status)

return nil
}

// If the status from contract is done, update the database
if i.watchMode == CrawlPastBlocks && eventStatus == relayer.EventStatusDone {
slog.Info("updating status for msgHash", "msgHash", common.Hash(event.MsgHash).Hex())

if err := i.eventRepo.UpdateStatus(ctx, id, relayer.EventStatusDone); err != nil {
return errors.Wrap(err, fmt.Sprintf("i.eventRepo.UpdateStatus, id: %v", id))
}
Expand Down
44 changes: 26 additions & 18 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,20 @@ func (i *Indexer) filter(ctx context.Context) error {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

if !messageSentEvents.Next() || messageSentEvents.Event == nil {
// use "end" not "filterEnd" here, because it will be used as the start
// of the next batch.
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
}

continue
}

group, groupCtx := errgroup.WithContext(ctx)
group.SetLimit(i.numGoroutines)

for messageSentEvents.Next() {
for {
event := messageSentEvents.Event

group.Go(func() error {
Expand All @@ -349,17 +359,21 @@ func (i *Indexer) filter(ctx context.Context) error {

return nil
})
}

// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
}
// if there are no more events
if !messageSentEvents.Next() {
// wait for the last of the goroutines to finish
if err := group.Wait(); err != nil {
return errors.Wrap(err, "group.Wait")
}
// handle no events remaining, saving the processing block and restarting the for
// loop
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
}

// handle no events remaining, saving the processing block and restarting the for
// loop
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")
break
}
}
}

Expand All @@ -379,16 +393,10 @@ func (i *Indexer) filter(ctx context.Context) error {
return errors.Wrap(err, "i.srcEthClient.HeaderByNumber")
}

latestBlockIDToCompare := latestBlock.Number.Uint64()

if i.watchMode == CrawlPastBlocks && latestBlockIDToCompare > i.numLatestBlocksToIgnoreWhenCrawling {
latestBlockIDToCompare -= i.numLatestBlocksToIgnoreWhenCrawling
}

if i.processingBlockHeight < latestBlockIDToCompare {
if i.processingBlockHeight < latestBlock.Number.Uint64() {
slog.Info("header has advanced",
"processingBlockHeight", i.processingBlockHeight,
"latestBlock", latestBlockIDToCompare,
"latestBlock", latestBlock.Number.Uint64(),
)

return i.filter(ctx)
Expand Down
16 changes: 14 additions & 2 deletions packages/relayer/pkg/queue/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ func (r *RabbitMQ) Ack(ctx context.Context, msg queue.Message) error {

err := rmqMsg.Ack(false)

slog.Info("attempted acknowledge rabbitmq message")

if err != nil {
slog.Error("error acknowledging rabbitmq message", "err", err.Error())
return err
Expand Down Expand Up @@ -214,6 +212,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close connection")
}

r.Close(ctx)

if err := r.connect(); err != nil {
slog.Error("error connecting to rabbitmq", "err", err.Error())
return err
}

return queue.ErrClosed
case err := <-r.chErrCh:
if err != nil {
Expand All @@ -222,6 +227,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close channel")
}

r.Close(ctx)

if err := r.connect(); err != nil {
slog.Error("error connecting to rabbitmq", "err", err.Error())
return err
}

return queue.ErrClosed
case returnMsg := <-r.notifyReturnCh:
slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText)
Expand Down
Loading