Skip to content

Commit 91562d6

Browse files
authored
Don't query reorgable blocks before all chains reach reorg threshold (#680)
* Add is in reorg threshold metric * Set block lag when outside of reorg threshold * Refactor reorg threshold entry * Enter reorg threshold when there are no events * Fix tests
1 parent 2b127cb commit 91562d6

File tree

9 files changed

+431
-328
lines changed

9 files changed

+431
-328
lines changed

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ type t = {
6868
// Fields computed by updateInternal
6969
latestFullyFetchedBlock: blockNumberAndTimestamp,
7070
// How much blocks behind the head we should query
71-
// Added for the purpose of avoiding reorg handling
72-
blockLag: option<int>,
71+
// Needed to query before entering reorg threshold
72+
blockLag: int,
7373
//Items ordered from latest to earliest
7474
queue: array<Internal.eventItem>,
7575
}
@@ -229,6 +229,7 @@ let updateInternal = (
229229
~dcsToStore=fetchState.dcsToStore,
230230
~currentBlockHeight=?,
231231
~queue=fetchState.queue,
232+
~blockLag=fetchState.blockLag,
232233
): t => {
233234
let firstPartition = partitions->Js.Array2.unsafe_get(0)
234235
let latestFullyFetchedBlock = ref(firstPartition.latestFetchedBlock)
@@ -284,7 +285,7 @@ let updateInternal = (
284285
latestFullyFetchedBlock,
285286
indexingContracts,
286287
dcsToStore,
287-
blockLag: fetchState.blockLag,
288+
blockLag,
288289
queue,
289290
}
290291
}
@@ -717,13 +718,12 @@ let getNextQuery = (
717718
~currentBlockHeight,
718719
~stateId,
719720
) => {
720-
if currentBlockHeight === 0 {
721+
let headBlock = currentBlockHeight - blockLag
722+
if headBlock <= 0 {
721723
WaitingForNewBlock
722724
} else if concurrencyLimit === 0 {
723725
ReachedMaxConcurrency
724726
} else {
725-
let headBlock = currentBlockHeight - blockLag->Option.getWithDefault(0)
726-
727727
let fullPartitions = []
728728
let mergingPartitions = []
729729
let areMergingPartitionsFetching = ref(false)
@@ -823,14 +823,14 @@ let getNextQuery = (
823823
switch p->makePartitionQuery(
824824
~indexingContracts,
825825
~endBlock=switch blockLag {
826-
| Some(_) =>
826+
| 0 => endBlock
827+
| _ =>
827828
switch endBlock {
828829
| Some(endBlock) => Some(Pervasives.min(headBlock, endBlock))
829830
// Force head block as an endBlock when blockLag is set
830831
// because otherwise HyperSync might return bigger range
831832
| None => Some(headBlock)
832833
}
833-
| None => endBlock
834834
},
835835
~mergeTarget,
836836
) {
@@ -893,18 +893,6 @@ let queueItemBlockNumber = (queueItem: queueItem) => {
893893
}
894894
}
895895

896-
let queueItemIsInReorgThreshold = (
897-
queueItem: queueItem,
898-
~currentBlockHeight,
899-
~highestBlockBelowThreshold,
900-
) => {
901-
if currentBlockHeight === 0 {
902-
false
903-
} else {
904-
queueItem->queueItemBlockNumber > highestBlockBelowThreshold
905-
}
906-
}
907-
908896
/**
909897
Simple constructor for no item from partition
910898
*/
@@ -969,7 +957,7 @@ let make = (
969957
~contracts: array<indexingContract>,
970958
~maxAddrInPartition,
971959
~chainId,
972-
~blockLag=?,
960+
~blockLag=0,
973961
): t => {
974962
let latestFetchedBlock = {
975963
blockTimestamp: 0,
@@ -1226,3 +1214,15 @@ let isActivelyIndexing = ({latestFullyFetchedBlock, endBlock} as fetchState: t)
12261214
| None => true
12271215
}
12281216
}
1217+
1218+
let isReadyToEnterReorgThreshold = (
1219+
{latestFullyFetchedBlock, endBlock, blockLag, queue}: t,
1220+
~currentBlockHeight,
1221+
) => {
1222+
currentBlockHeight !== 0 &&
1223+
switch endBlock {
1224+
| Some(endBlock) if latestFullyFetchedBlock.blockNumber >= endBlock => true
1225+
| _ => latestFullyFetchedBlock.blockNumber >= currentBlockHeight - blockLag
1226+
} &&
1227+
queue->Utils.Array.isEmpty
1228+
}

codegenerator/cli/npm/envio/src/Prometheus.res

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,17 @@ module ReorgDetectionBlockNumber = {
472472
}
473473
}
474474

475+
module ReorgThreshold = {
476+
let gauge = PromClient.Gauge.makeGauge({
477+
"name": "envio_reorg_threshold",
478+
"help": "Whether indexing is currently within the reorg threshold",
479+
})
480+
481+
let set = (~isInReorgThreshold) => {
482+
gauge->PromClient.Gauge.set(isInReorgThreshold ? 1 : 0)
483+
}
484+
}
485+
475486
module RollbackEnabled = {
476487
let gauge = PromClient.Gauge.makeGauge({
477488
"name": "envio_rollback_enabled",

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ let make = (
3636
~endBlock,
3737
~dbFirstEventBlockNumber,
3838
~latestProcessedBlock,
39+
~config: Config.t,
3940
~logger,
4041
~timestampCaughtUpToHeadOrEndblock,
4142
~numEventsProcessed,
4243
~numBatchesFetched,
4344
~processingFilters,
4445
~maxAddrInPartition,
45-
~enableRawEvents,
46+
~isInReorgThreshold,
4647
): t => {
4748
// We don't need the router itself, but only validation logic,
4849
// since now event router is created for selection of events
@@ -73,7 +74,7 @@ let make = (
7374

7475
// Filter out non-preRegistration events on preRegistration phase
7576
// so we don't care about it in fetch state and workers anymore
76-
let shouldBeIncluded = if enableRawEvents {
77+
let shouldBeIncluded = if config.enableRawEvents {
7778
true
7879
} else {
7980
let isRegistered = hasContractRegister || eventConfig.handler->Option.isSome
@@ -133,7 +134,12 @@ let make = (
133134
~endBlock,
134135
~eventConfigs,
135136
~chainId=chainConfig.chain->ChainMap.Chain.toChainId,
136-
~blockLag=?Env.indexingBlockLag,
137+
~blockLag=Pervasives.max(
138+
!(config->Config.shouldRollbackOnReorg) || isInReorgThreshold
139+
? 0
140+
: chainConfig.confirmedBlockThreshold,
141+
Env.indexingBlockLag->Option.getWithDefault(0),
142+
),
137143
)
138144

139145
{
@@ -156,14 +162,15 @@ let make = (
156162
}
157163
}
158164

159-
let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition, ~enableRawEvents) => {
165+
let makeFromConfig = (chainConfig: Config.chainConfig, ~config, ~maxAddrInPartition) => {
160166
let logger = Logging.createChild(~params={"chainId": chainConfig.chain->ChainMap.Chain.toChainId})
161167
let lastBlockScannedHashes = ReorgDetection.LastBlockScannedHashes.empty(
162168
~confirmedBlockThreshold=chainConfig.confirmedBlockThreshold,
163169
)
164170

165171
make(
166172
~chainConfig,
173+
~config,
167174
~startBlock=chainConfig.startBlock,
168175
~endBlock=chainConfig.endBlock,
169176
~lastBlockScannedHashes,
@@ -176,7 +183,7 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition, ~ena
176183
~processingFilters=None,
177184
~dynamicContracts=[],
178185
~maxAddrInPartition,
179-
~enableRawEvents,
186+
~isInReorgThreshold=false,
180187
)
181188
}
182189

@@ -186,7 +193,8 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition, ~ena
186193
let makeFromDbState = async (
187194
chainConfig: Config.chainConfig,
188195
~maxAddrInPartition,
189-
~enableRawEvents,
196+
~isInReorgThreshold,
197+
~config,
190198
~sql=Db.sql,
191199
) => {
192200
let logger = Logging.createChild(~params={"chainId": chainConfig.chain->ChainMap.Chain.toChainId})
@@ -288,6 +296,7 @@ let makeFromDbState = async (
288296
~chainConfig,
289297
~startBlock=restartBlockNumber,
290298
~endBlock=chainConfig.endBlock,
299+
~config,
291300
~lastBlockScannedHashes,
292301
~dbFirstEventBlockNumber=firstEventBlockNumber,
293302
~latestProcessedBlock=latestProcessedBlockChainMetadata,
@@ -297,7 +306,7 @@ let makeFromDbState = async (
297306
~logger,
298307
~processingFilters,
299308
~maxAddrInPartition,
300-
~enableRawEvents,
309+
~isInReorgThreshold,
301310
)
302311
}
303312

0 commit comments

Comments
 (0)