Skip to content

Commit f5856bb

Browse files
authored
Track indexer progress in the internal envio_chains table (#723)
* Merge event_sync_state into envio_chains * Track progressed chains * Commit block progress to the db * Fixes * Fix tests * Add resume log for reorg debugging
1 parent e35df5e commit f5856bb

File tree

28 files changed

+848
-717
lines changed

28 files changed

+848
-717
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
type progressedChain = {
2+
chainId: int,
3+
batchSize: int,
4+
progressBlockNumber: int,
5+
progressNextBlockLogIndex: option<int>,
6+
totalEventsProcessed: int,
7+
}
8+
9+
type t = {
10+
items: array<Internal.eventItem>,
11+
progressedChains: array<progressedChain>,
12+
fetchStates: ChainMap.t<FetchState.t>,
13+
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
14+
}
15+
16+
type multiChainEventComparitor = {
17+
chain: ChainMap.Chain.t,
18+
earliestEvent: FetchState.queueItem,
19+
}
20+
21+
let getComparitorFromItem = (queueItem: Internal.eventItem) => {
22+
let {timestamp, chain, blockNumber, logIndex} = queueItem
23+
EventUtils.getEventComparator({
24+
timestamp,
25+
chainId: chain->ChainMap.Chain.toChainId,
26+
blockNumber,
27+
logIndex,
28+
})
29+
}
30+
31+
let getQueueItemComparitor = (earliestQueueItem: FetchState.queueItem, ~chain) => {
32+
switch earliestQueueItem {
33+
| Item({item}) => item->getComparitorFromItem
34+
| NoItem({latestFetchedBlock: {blockTimestamp, blockNumber}}) => (
35+
blockTimestamp,
36+
chain->ChainMap.Chain.toChainId,
37+
blockNumber,
38+
0,
39+
)
40+
}
41+
}
42+
43+
let isQueueItemEarlier = (a: multiChainEventComparitor, b: multiChainEventComparitor): bool => {
44+
a.earliestEvent->getQueueItemComparitor(~chain=a.chain) <
45+
b.earliestEvent->getQueueItemComparitor(~chain=b.chain)
46+
}
47+
48+
/**
49+
It either returnes an earliest item among all chains, or None if no chains are actively indexing
50+
*/
51+
let getOrderedNextItem = (fetchStates: ChainMap.t<FetchState.t>): option<
52+
multiChainEventComparitor,
53+
> => {
54+
fetchStates
55+
->ChainMap.entries
56+
->Belt.Array.reduce(None, (accum, (chain, fetchState)) => {
57+
// If the fetch state has reached the end block we don't need to consider it
58+
if fetchState->FetchState.isActivelyIndexing {
59+
let earliestEvent = fetchState->FetchState.getEarliestEvent
60+
let current: multiChainEventComparitor = {chain, earliestEvent}
61+
switch accum {
62+
| Some(previous) if isQueueItemEarlier(previous, current) => accum
63+
| _ => Some(current)
64+
}
65+
} else {
66+
accum
67+
}
68+
})
69+
}
70+
71+
let popOrderedBatchItems = (
72+
~maxBatchSize,
73+
~fetchStates: ChainMap.t<FetchState.t>,
74+
~sizePerChain: dict<int>,
75+
) => {
76+
let items = []
77+
78+
let rec loop = () =>
79+
if items->Array.length < maxBatchSize {
80+
switch fetchStates->getOrderedNextItem {
81+
| Some({earliestEvent}) =>
82+
switch earliestEvent {
83+
| NoItem(_) => ()
84+
| Item({item, popItemOffQueue}) => {
85+
popItemOffQueue()
86+
items->Js.Array2.push(item)->ignore
87+
sizePerChain->Utils.Dict.incrementByInt(item.chain->ChainMap.Chain.toChainId)
88+
loop()
89+
}
90+
}
91+
| _ => ()
92+
}
93+
}
94+
loop()
95+
96+
items
97+
}
98+
99+
let popUnorderedBatchItems = (
100+
~maxBatchSize,
101+
~fetchStates: ChainMap.t<FetchState.t>,
102+
~sizePerChain: dict<int>,
103+
) => {
104+
let items = []
105+
106+
let preparedFetchStates =
107+
fetchStates
108+
->ChainMap.values
109+
->FetchState.filterAndSortForUnorderedBatch(~maxBatchSize)
110+
111+
let idx = ref(0)
112+
let preparedNumber = preparedFetchStates->Array.length
113+
let batchSize = ref(0)
114+
115+
// Accumulate items for all actively indexing chains
116+
// the way to group as many items from a single chain as possible
117+
// This way the loaders optimisations will hit more often
118+
while batchSize.contents < maxBatchSize && idx.contents < preparedNumber {
119+
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(idx.contents)
120+
let batchSizeBeforeTheChain = batchSize.contents
121+
122+
let rec loop = () =>
123+
if batchSize.contents < maxBatchSize {
124+
let earliestEvent = fetchState->FetchState.getEarliestEvent
125+
switch earliestEvent {
126+
| NoItem(_) => ()
127+
| Item({item, popItemOffQueue}) => {
128+
popItemOffQueue()
129+
items->Js.Array2.push(item)->ignore
130+
batchSize := batchSize.contents + 1
131+
loop()
132+
}
133+
}
134+
}
135+
loop()
136+
137+
let chainBatchSize = batchSize.contents - batchSizeBeforeTheChain
138+
if chainBatchSize > 0 {
139+
sizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
140+
}
141+
142+
idx := idx.contents + 1
143+
}
144+
145+
items
146+
}

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,3 +1271,18 @@ let filterAndSortForUnorderedBatch = {
12711271
})
12721272
}
12731273
}
1274+
1275+
let getProgressBlockNumber = ({latestFullyFetchedBlock, queue}: t) => {
1276+
switch queue->Utils.Array.last {
1277+
| Some(item) if latestFullyFetchedBlock.blockNumber >= item.blockNumber => item.blockNumber - 1
1278+
| _ => latestFullyFetchedBlock.blockNumber
1279+
}
1280+
}
1281+
1282+
let getProgressNextBlockLogIndex = ({queue, latestFullyFetchedBlock}: t) => {
1283+
switch queue->Utils.Array.last {
1284+
| Some(item) if latestFullyFetchedBlock.blockNumber >= item.blockNumber && item.logIndex > 0 =>
1285+
Some(item.logIndex - 1)
1286+
| _ => None
1287+
}
1288+
}

codegenerator/cli/npm/envio/src/Persistence.res

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type storage = {
3232
~entities: array<Internal.entityConfig>=?,
3333
~enums: array<Internal.enumConfig<Internal.enum>>=?,
3434
) => promise<initialState>,
35-
loadInitialState: unit => promise<initialState>,
35+
resumeInitialState: unit => promise<initialState>,
3636
@raises("StorageError")
3737
loadByIdsOrThrow: 'item. (
3838
~ids: array<string>,
@@ -123,14 +123,12 @@ let init = {
123123
persistence.storageStatus = Initializing(promise)
124124
if reset || !(await persistence.storage.isInitialized()) {
125125
Logging.info(`Initializing the indexer storage...`)
126-
127126
let initialState = await persistence.storage.initialize(
128127
~entities=persistence.allEntities,
129128
~enums=persistence.allEnums,
130129
~chainConfigs,
131130
)
132-
133-
Logging.info(`The indexer storage is ready. Uploading cache...`)
131+
Logging.info(`The indexer storage is ready. Starting indexing!`)
134132
persistence.storageStatus = Ready(initialState)
135133
} else if (
136134
// In case of a race condition,
@@ -140,8 +138,29 @@ let init = {
140138
| _ => false
141139
}
142140
) {
143-
Logging.info(`The indexer storage is ready.`)
144-
persistence.storageStatus = Ready(await persistence.storage.loadInitialState())
141+
Logging.info(`Found existing indexer storage. Resuming indexing state...`)
142+
let initialState = await persistence.storage.resumeInitialState()
143+
persistence.storageStatus = Ready(initialState)
144+
let checkpoints = Js.Dict.empty()
145+
initialState.chains->Js.Array2.forEach(c => {
146+
let checkpoint = switch c.progressNextBlockLogIndex {
147+
| Value(
148+
logIndex,
149+
) => // Latest processed log index (not necessarily processed by the indexer)
150+
{
151+
"blockNumber": c.progressBlockNumber + 1,
152+
"logIndex": logIndex,
153+
}
154+
| Null =>
155+
// Or simply the latest processed block number (might be -1 if not set)
156+
c.progressBlockNumber->Utils.magic
157+
}
158+
checkpoints->Utils.Dict.setByInt(c.id, checkpoint)
159+
})
160+
Logging.info({
161+
"msg": `Successfully resumed indexing state! Continuing from the last checkpoint.`,
162+
"checkpoints": checkpoints,
163+
})
145164
}
146165
resolveRef.contents()
147166
}

codegenerator/cli/npm/envio/src/PgStorage.res

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ let makeInitializeTransaction = (
6363
~isEmptyPgSchema=false,
6464
) => {
6565
let generalTables = [
66-
InternalTable.EventSyncState.table,
6766
InternalTable.Chains.table,
6867
InternalTable.PersistedState.table,
6968
InternalTable.EndOfBlockRangeScannedData.table,
@@ -566,10 +565,10 @@ let make = (
566565
])
567566

568567
let isInitialized = async () => {
569-
let envioTables =
570-
await sql->Postgres.unsafe(
571-
`SELECT table_schema FROM information_schema.tables WHERE table_schema = '${pgSchema}' AND table_name = '${InternalTable.EventSyncState.table.tableName}' OR table_name = '${InternalTable.Chains.table.tableName}';`,
572-
)
568+
let envioTables = await sql->Postgres.unsafe(
569+
`SELECT table_schema FROM information_schema.tables WHERE table_schema = '${pgSchema}' AND table_name = '${// This is for indexer before [email protected]
570+
"event_sync_state"}' OR table_name = '${InternalTable.Chains.table.tableName}';`,
571+
)
573572
envioTables->Utils.Array.notEmpty
574573
}
575574

@@ -671,7 +670,9 @@ let make = (
671670
// This means that the schema is used for something else than envio.
672671
!(
673672
schemaTableNames->Js.Array2.some(table =>
674-
table.tableName === InternalTable.EventSyncState.table.tableName
673+
table.tableName === InternalTable.Chains.table.tableName ||
674+
// Case for indexer before [email protected]
675+
table.tableName === "event_sync_state"
675676
)
676677
)
677678
) {
@@ -893,7 +894,7 @@ let make = (
893894
}
894895
}
895896

896-
let loadInitialState = async (): Persistence.initialState => {
897+
let resumeInitialState = async (): Persistence.initialState => {
897898
let (cache, chains) = await Promise.all2((
898899
restoreEffectCache(~withUpload=false),
899900
sql
@@ -903,6 +904,13 @@ let make = (
903904
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Chains.t>>),
904905
))
905906

907+
if chains->Utils.Array.notEmpty {
908+
let () =
909+
await sql->Postgres.unsafe(
910+
InternalTable.DynamicContractRegistry.makeCleanUpOnRestartQuery(~pgSchema, ~chains),
911+
)
912+
}
913+
906914
{
907915
cleanRun: false,
908916
cache,
@@ -913,7 +921,7 @@ let make = (
913921
{
914922
isInitialized,
915923
initialize,
916-
loadInitialState,
924+
resumeInitialState,
917925
loadByFieldOrThrow,
918926
loadByIdsOrThrow,
919927
setOrThrow,

codegenerator/cli/npm/envio/src/Throttler.res

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ let rec startInternal = async (throttler: t) => {
3030
switch await fn() {
3131
| exception exn =>
3232
throttler.logger->Pino.errorExn(
33-
Pino.createPinoMessageWithError("Scheduled action failed in throttler", exn),
33+
Pino.createPinoMessageWithError(
34+
"Scheduled action failed in throttler",
35+
exn->Utils.prettifyExn,
36+
),
3437
)
3538
| _ => ()
3639
}

codegenerator/cli/npm/envio/src/Utils.res

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ module Dict = {
130130
let shallowCopy: dict<'a> => dict<'a> = %raw(`(dict) => ({...dict})`)
131131

132132
let size = dict => dict->Js.Dict.keys->Js.Array2.length
133+
134+
@set_index
135+
external setByInt: (dict<'a>, int, 'a) => unit = ""
136+
137+
let incrementByInt: (dict<int>, int) => unit = %raw(`(dict, key) => {
138+
dict[key]++
139+
}`)
133140
}
134141

135142
module Math = {

codegenerator/cli/npm/envio/src/bindings/Postgres.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type poolConfig = {
7878
// types?: array<'a>, // Array of custom types, see more below (default: [])
7979
onnotice?: string => unit, // Default console.log, set false to silence NOTICE (default: fn)
8080
onParameter?: (string, string) => unit, // (key, value) when server param change (default: fn)
81-
debug?: 'a. 'a => unit, //(connection, query, params, types) => unit, // Is called with (connection, query, params, types) (default: fn)
81+
debug?: (~connection: unknown, ~query: unknown, ~params: unknown, ~types: unknown) => unit, // Is called with (connection, query, params, types)
8282
socket?: unit => unit, // fn returning custom socket to use (default: fn)
8383
transform?: transformConfig,
8484
connection?: connectionConfig,

codegenerator/cli/npm/envio/src/db/InternalTable.gen.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,6 @@ import type {Json_t as Js_Json_t} from '../../src/Js.shim';
77

88
import type {t as Address_t} from '../../src/Address.gen';
99

10-
export type EventSyncState_t = {
11-
readonly chain_id: number;
12-
readonly block_number: number;
13-
readonly log_index: number;
14-
readonly block_timestamp: number
15-
};
16-
1710
export type RawEvents_t = {
1811
readonly chain_id: number;
1912
readonly event_id: bigint;

0 commit comments

Comments
 (0)