diff --git a/packages/relayer/indexer/detect_and_handle_reorg.go b/packages/relayer/indexer/detect_and_handle_reorg.go index 30d5efa35d..c82c75f1d9 100644 --- a/packages/relayer/indexer/detect_and_handle_reorg.go +++ b/packages/relayer/indexer/detect_and_handle_reorg.go @@ -9,12 +9,6 @@ import ( ) func (i *Indexer) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error { - // dont check on crawling past blocks, it will be a secondary indexer. - // we expect to see duplicates in this mode. - if i.watchMode == CrawlPastBlocks { - return nil - } - e, err := i.eventRepo.FirstByEventAndMsgHash(ctx, eventType, msgHash) if err != nil { return errors.Wrap(err, "svc.eventRepo.FirstByMsgHash") diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index c07be55bf8..0b548f674a 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -47,11 +47,6 @@ func (i *Indexer) handleEvent( return nil } - // check if we have seen this event and msgHash before - if we have, it is being reorged. - if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil { - return errors.Wrap(err, "svc.detectAndHandleReorg") - } - // we should never see an empty msgHash, but if we do, we dont process. if event.MsgHash == relayer.ZeroHash { slog.Warn("Zero msgHash found. This is unexpected. Returning early") @@ -61,6 +56,11 @@ func (i *Indexer) handleEvent( // only wait for confirmations when not crawling past blocks. // these are guaranteed to be confirmed since the blocks are old. if i.watchMode != CrawlPastBlocks { + // check if we have seen this event and msgHash before - if we have, it is being reorged. + if err := i.detectAndHandleReorg(ctx, relayer.EventNameMessageSent, common.Hash(event.MsgHash).Hex()); err != nil { + return errors.Wrap(err, "svc.detectAndHandleReorg") + } + // we need to wait for confirmations to confirm this event is not being reverted, // removed, or reorged now. confCtx, confCtxCancel := context.WithTimeout(ctx, defaultCtxTimeout) @@ -141,11 +141,15 @@ func (i *Indexer) handleEvent( if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status { // If the status from contract matches the existing event status, // we can return early as this message has been processed as expected. + slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) + return nil } // If the status from contract is done, update the database if i.watchMode == CrawlPastBlocks && eventStatus == relayer.EventStatusDone { + slog.Info("updating status for msgHash", "msgHash", common.Hash(event.MsgHash).Hex()) + if err := i.eventRepo.UpdateStatus(ctx, id, relayer.EventStatusDone); err != nil { return errors.Wrap(err, fmt.Sprintf("i.eventRepo.UpdateStatus, id: %v", id)) } diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index 8c2f2cc23f..31483c9000 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -331,10 +331,20 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "bridge.FilterMessageSent") } + if !messageSentEvents.Next() || messageSentEvents.Event == nil { + // use "end" not "filterEnd" here, because it will be used as the start + // of the next batch. + if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { + return errors.Wrap(err, "i.handleNoEventsInBatch") + } + + continue + } + group, groupCtx := errgroup.WithContext(ctx) group.SetLimit(i.numGoroutines) - for messageSentEvents.Next() { + for { event := messageSentEvents.Event group.Go(func() error { @@ -349,17 +359,21 @@ func (i *Indexer) filter(ctx context.Context) error { return nil }) - } - // wait for the last of the goroutines to finish - if err := group.Wait(); err != nil { - return errors.Wrap(err, "group.Wait") - } + // if there are no more events + if !messageSentEvents.Next() { + // wait for the last of the goroutines to finish + if err := group.Wait(); err != nil { + return errors.Wrap(err, "group.Wait") + } + // handle no events remaining, saving the processing block and restarting the for + // loop + if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { + return errors.Wrap(err, "i.handleNoEventsInBatch") + } - // handle no events remaining, saving the processing block and restarting the for - // loop - if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { - return errors.Wrap(err, "i.handleNoEventsInBatch") + break + } } } @@ -379,16 +393,10 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "i.srcEthClient.HeaderByNumber") } - latestBlockIDToCompare := latestBlock.Number.Uint64() - - if i.watchMode == CrawlPastBlocks && latestBlockIDToCompare > i.numLatestBlocksToIgnoreWhenCrawling { - latestBlockIDToCompare -= i.numLatestBlocksToIgnoreWhenCrawling - } - - if i.processingBlockHeight < latestBlockIDToCompare { + if i.processingBlockHeight < latestBlock.Number.Uint64() { slog.Info("header has advanced", "processingBlockHeight", i.processingBlockHeight, - "latestBlock", latestBlockIDToCompare, + "latestBlock", latestBlock.Number.Uint64(), ) return i.filter(ctx) diff --git a/packages/relayer/pkg/queue/rabbitmq/queue.go b/packages/relayer/pkg/queue/rabbitmq/queue.go index 95012e9941..b1f5c6e74c 100644 --- a/packages/relayer/pkg/queue/rabbitmq/queue.go +++ b/packages/relayer/pkg/queue/rabbitmq/queue.go @@ -163,8 +163,6 @@ func (r *RabbitMQ) Ack(ctx context.Context, msg queue.Message) error { err := rmqMsg.Ack(false) - slog.Info("attempted acknowledge rabbitmq message") - if err != nil { slog.Error("error acknowledging rabbitmq message", "err", err.Error()) return err @@ -214,6 +212,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error { slog.Error("rabbitmq notify close connection") } + r.Close(ctx) + + if err := r.connect(); err != nil { + slog.Error("error connecting to rabbitmq", "err", err.Error()) + return err + } + return queue.ErrClosed case err := <-r.chErrCh: if err != nil { @@ -222,6 +227,13 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error { slog.Error("rabbitmq notify close channel") } + r.Close(ctx) + + if err := r.connect(); err != nil { + slog.Error("error connecting to rabbitmq", "err", err.Error()) + return err + } + return queue.ErrClosed case returnMsg := <-r.notifyReturnCh: slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText)