From b2d5b90a0c3ab21d56f910109aa97b7f5606002b Mon Sep 17 00:00:00 2001 From: xiaodino Date: Mon, 19 Feb 2024 23:15:14 -0800 Subject: [PATCH 01/10] feat(guardian-prover-health-check): add indexer in MySQL (#15945) --- ...1003_alter_health_checks_guardian_prover_id_index.sql | 9 +++++++++ .../1666651004_alter_signed_blocks_block_id_index.sql | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 packages/guardian-prover-health-check/migrations/1666651003_alter_health_checks_guardian_prover_id_index.sql create mode 100644 packages/guardian-prover-health-check/migrations/1666651004_alter_signed_blocks_block_id_index.sql diff --git a/packages/guardian-prover-health-check/migrations/1666651003_alter_health_checks_guardian_prover_id_index.sql b/packages/guardian-prover-health-check/migrations/1666651003_alter_health_checks_guardian_prover_id_index.sql new file mode 100644 index 0000000000..c8a49fb649 --- /dev/null +++ b/packages/guardian-prover-health-check/migrations/1666651003_alter_health_checks_guardian_prover_id_index.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE `health_checks` ADD INDEX `health_checks_guardian_prover_id_index` (`guardian_prover_id`); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +DROP INDEX health_checks_guardian_prover_id_index on health_checks; +-- +goose StatementEnd diff --git a/packages/guardian-prover-health-check/migrations/1666651004_alter_signed_blocks_block_id_index.sql b/packages/guardian-prover-health-check/migrations/1666651004_alter_signed_blocks_block_id_index.sql new file mode 100644 index 0000000000..078e00bd05 --- /dev/null +++ b/packages/guardian-prover-health-check/migrations/1666651004_alter_signed_blocks_block_id_index.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE `signed_blocks` ADD INDEX `signed_blocks_block_id_index` (`block_id`); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +DROP INDEX signed_blocks_block_id_index on signed_blocks; +-- +goose StatementEnd From 98abd8b55342cdd71bcbc4925a62b59ab7e6e899 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 16:16:41 -0700 Subject: [PATCH 02/10] fix(relayer): restart indexer when the indexer exits due to faield API requests --- packages/relayer/indexer/indexer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index 31483c9000..b79a335b90 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -218,10 +218,6 @@ func (i *Indexer) Start() error { i.wg.Add(1) go func() { - defer func() { - i.wg.Done() - }() - if err := i.filter(i.ctx); err != nil { slog.Error("error filtering blocks", "error", err.Error()) } @@ -334,6 +330,7 @@ func (i *Indexer) filter(ctx context.Context) error { if !messageSentEvents.Next() || messageSentEvents.Event == nil { // use "end" not "filterEnd" here, because it will be used as the start // of the next batch. + if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") } @@ -370,6 +367,7 @@ func (i *Indexer) filter(ctx context.Context) error { // loop if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") + } break From a9e43a5d02bd860147a3e47661fb56bc99818feb Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 16:17:53 -0700 Subject: [PATCH 03/10] Update --- packages/relayer/indexer/indexer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index b79a335b90..495a6bef8c 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -330,7 +330,6 @@ func (i *Indexer) filter(ctx context.Context) error { if !messageSentEvents.Next() || messageSentEvents.Event == nil { // use "end" not "filterEnd" here, because it will be used as the start // of the next batch. - if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") } @@ -369,7 +368,6 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "i.handleNoEventsInBatch") } - break } } From 32f9e3d48912e9e3dc70ff8497a2377e57de3ce8 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 16:20:20 -0700 Subject: [PATCH 04/10] Update --- packages/relayer/indexer/indexer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index 495a6bef8c..820d1fd4b0 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -368,6 +368,7 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "i.handleNoEventsInBatch") } + break } } From c32f20b0f5537eaee430afd1764ecb575be3b5ff Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 16:49:41 -0700 Subject: [PATCH 05/10] Update --- packages/relayer/indexer/handle_event.go | 5 +++-- packages/relayer/indexer/indexer.go | 2 -- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index 0b548f674a..5e8cb4c7b4 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -93,6 +93,8 @@ func (i *Indexer) handleEvent( return errors.Wrap(err, "eventTypeAmountAndCanonicalTokenFromEvent(event)") } + // TODO(xiaodino): Change to batch query + // check if we have an existing event already. this is mostly likely only true // in the case of us crawling past blocks. existingEvent, err := i.eventRepo.FirstByEventAndMsgHash( @@ -141,8 +143,7 @@ func (i *Indexer) handleEvent( if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status { // If the status from contract matches the existing event status, // we can return early as this message has been processed as expected. - slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) - + // slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) return nil } diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index 820d1fd4b0..cec315f60b 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -349,8 +349,6 @@ func (i *Indexer) filter(ctx context.Context) error { relayer.ErrorEvents.Inc() // log error but always return nil to keep other goroutines active slog.Error("error handling event", "err", err.Error()) - } else { - slog.Info("handled event successfully") } return nil From 4a3dfa26116878ce31588ec7dce05e2d5fc65033 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 16:51:30 -0700 Subject: [PATCH 06/10] Update --- packages/relayer/indexer/handle_event.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index 5e8cb4c7b4..d976ce3a93 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -28,8 +28,7 @@ func (i *Indexer) handleEvent( chainID *big.Int, event *bridge.BridgeMessageSent, ) error { - slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) - + // slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) // if the destinatio chain doesnt match, we dont process it in this indexer. if new(big.Int).SetUint64(event.Message.DestChainId).Cmp(i.destChainId) != 0 { slog.Info("skipping event, wrong chainID", From caf3cff4b4c3f19b6e6e641b8c4c3e13a5882455 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 22:38:28 -0700 Subject: [PATCH 07/10] Update --- packages/relayer/indexer/handle_event.go | 7 +++--- packages/relayer/indexer/indexer.go | 27 ++++++++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index d976ce3a93..8e048e3282 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -28,8 +28,9 @@ func (i *Indexer) handleEvent( chainID *big.Int, event *bridge.BridgeMessageSent, ) error { - // slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) + slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) // if the destinatio chain doesnt match, we dont process it in this indexer. + if new(big.Int).SetUint64(event.Message.DestChainId).Cmp(i.destChainId) != 0 { slog.Info("skipping event, wrong chainID", "messageDestChainID", @@ -92,8 +93,6 @@ func (i *Indexer) handleEvent( return errors.Wrap(err, "eventTypeAmountAndCanonicalTokenFromEvent(event)") } - // TODO(xiaodino): Change to batch query - // check if we have an existing event already. this is mostly likely only true // in the case of us crawling past blocks. existingEvent, err := i.eventRepo.FirstByEventAndMsgHash( @@ -142,7 +141,7 @@ func (i *Indexer) handleEvent( if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status { // If the status from contract matches the existing event status, // we can return early as this message has been processed as expected. - // slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) + slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) return nil } diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index cec315f60b..d41c937773 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -218,14 +218,24 @@ func (i *Indexer) Start() error { i.wg.Add(1) go func() { - if err := i.filter(i.ctx); err != nil { - slog.Error("error filtering blocks", "error", err.Error()) + if err := backoff.Retry(func() error { + err := i.filter(i.ctx) + if err != nil { + slog.Error("filter failed, will retry", "error", err) + } + return err + }, backoff.NewConstantBackOff(5*time.Second)); err != nil { + slog.Error("error after retrying filter with backoff", "error", err) } }() go func() { if err := backoff.Retry(func() error { - return scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg) + err := scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg) + if err != nil { + slog.Error("scanBlocks failed, will retry", "error", err) + } + return err }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } @@ -233,9 +243,13 @@ func (i *Indexer) Start() error { go func() { if err := backoff.Retry(func() error { - return i.queue.Notify(i.ctx, i.wg) + err := i.queue.Notify(i.ctx, i.wg) + if err != nil { + slog.Error("i.queue.Notify failed, will retry", "error", err) + } + return err }, backoff.NewConstantBackOff(5*time.Second)); err != nil { - slog.Error("queue notify backoff retry", "error", err) + slog.Error("i.queue.Notify backoff retry", "error", err) } }() @@ -349,6 +363,8 @@ func (i *Indexer) filter(ctx context.Context) error { relayer.ErrorEvents.Inc() // log error but always return nil to keep other goroutines active slog.Error("error handling event", "err", err.Error()) + } else { + slog.Info("handled event successfully") } return nil @@ -364,7 +380,6 @@ func (i *Indexer) filter(ctx context.Context) error { // loop if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") - } break From cb35a58e797eab99f24911988fee53d6b296ae82 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 22:40:10 -0700 Subject: [PATCH 08/10] Update --- packages/relayer/indexer/handle_event.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index 8e048e3282..0b548f674a 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -29,8 +29,8 @@ func (i *Indexer) handleEvent( event *bridge.BridgeMessageSent, ) error { slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) - // if the destinatio chain doesnt match, we dont process it in this indexer. + // if the destinatio chain doesnt match, we dont process it in this indexer. if new(big.Int).SetUint64(event.Message.DestChainId).Cmp(i.destChainId) != 0 { slog.Info("skipping event, wrong chainID", "messageDestChainID", @@ -142,6 +142,7 @@ func (i *Indexer) handleEvent( // If the status from contract matches the existing event status, // we can return early as this message has been processed as expected. slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) + return nil } From c8da85bea96ad9038e88c5f080b56d3f426d1ad6 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 22:58:54 -0700 Subject: [PATCH 09/10] Update --- packages/relayer/indexer/indexer.go | 3 +++ .../relayer/indexer/set_initial_processing_block_by_mode.go | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index d41c937773..fa254eb675 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -393,6 +393,9 @@ func (i *Indexer) filter(ctx context.Context) error { if i.watchMode == CrawlPastBlocks { slog.Info("restarting filtering from genesis") + + i.processingBlockHeight = 0 + return i.filter(ctx) } diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode.go b/packages/relayer/indexer/set_initial_processing_block_by_mode.go index 2df25e038a..bb980711e6 100644 --- a/packages/relayer/indexer/set_initial_processing_block_by_mode.go +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode.go @@ -43,7 +43,12 @@ func (i *Indexer) setInitialProcessingBlockByMode( return nil case Resync: + if i.watchMode == CrawlPastBlocks && i.processingBlockHeight > startingBlock { + return nil + } + i.processingBlockHeight = startingBlock + return nil default: return relayer.ErrInvalidMode From f36d1c08a5816be2f3fce3f7f32869e372ef5f4a Mon Sep 17 00:00:00 2001 From: xiaodino Date: Mon, 18 Mar 2024 00:30:35 -0700 Subject: [PATCH 10/10] Update --- packages/relayer/indexer/indexer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index fa254eb675..02294a5bdb 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -224,7 +224,7 @@ func (i *Indexer) Start() error { slog.Error("filter failed, will retry", "error", err) } return err - }, backoff.NewConstantBackOff(5*time.Second)); err != nil { + }, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil { slog.Error("error after retrying filter with backoff", "error", err) } }() @@ -236,7 +236,7 @@ func (i *Indexer) Start() error { slog.Error("scanBlocks failed, will retry", "error", err) } return err - }, backoff.NewConstantBackOff(5*time.Second)); err != nil { + }, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } }() @@ -248,7 +248,7 @@ func (i *Indexer) Start() error { slog.Error("i.queue.Notify failed, will retry", "error", err) } return err - }, backoff.NewConstantBackOff(5*time.Second)); err != nil { + }, backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), i.ctx)); err != nil { slog.Error("i.queue.Notify backoff retry", "error", err) } }()