Skip to content

Commit a021fd1

Browse files
Improve chain priority logic (#690)
* Improve chain priority logic * Update scenarios/test_codegen/test/lib_tests/FetchState_test.res * Update performance in the readme * Improve hasFullBatch logic * Fix Hyperliquid chain id --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 5860866 commit a021fd1

File tree

6 files changed

+149
-71
lines changed

6 files changed

+149
-71
lines changed

.cursor/rules/navigation.mdc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ Tips for navigation:
3131
- Config parsing & codegen lives in Rust. When tracking how a value reaches templates, follow `human_config.rs` → `system_config.rs` → `codegen_templates.rs`.
3232
- Prefer reading ReScript `.res` modules directly; compiled `.js` artifacts can be ignored.
3333
- When suggesting file edits to contributors, reference modules with `.res` extension but drop the long path (e.g., "edit `ChainFetcher.res`").
34+
35+
## Testing
36+
37+
Prefer Public module API for testing.
38+
39+
Verify that tests pass by running a compiler `pnpm rescript` and tests `pnpm mocha`. Use `_only` to specify which tests to run.

codegenerator/cli/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ HyperIndex is a fast, developer-friendly multichain indexer, optimized for both
1212
## Key Features
1313

1414
- **[Indexer auto-generation](https://docs.envio.dev/docs/HyperIndex/contract-import)** – Generate Indexers directly from smart contract addresses
15-
- **High performance** – Historical backfills at over 5,000+ events per second ([fastest in market](https://docs.envio.dev/blog/indexer-benchmarking-results))
15+
- **High performance** – Historical backfills at over 10,000+ events per second ([fastest in market](https://docs.envio.dev/blog/indexer-benchmarking-results))
1616
- **Local development** – Full-featured local environment with Docker
1717
- **[Multichain indexing](https://docs.envio.dev/docs/HyperIndex/multichain-indexing)** – Index any EVM-compatible blockchain and Fuel (simultaneously)
1818
- **Real-time indexing** – Instantly track blockchain events

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,14 +1235,34 @@ let filterAndSortForUnorderedBatch = {
12351235
}
12361236
}
12371237

1238-
let compareUnorderedBatchChainPriority = (a: t, b: t) => {
1239-
// Use unsafe since we filtered out all queues without batch items
1240-
(a.queue->Utils.Array.lastUnsafe).timestamp - (b.queue->Utils.Array.lastUnsafe).timestamp
1238+
let hasFullBatch = ({queue, latestFullyFetchedBlock}: t, ~maxBatchSize) => {
1239+
// Queue is ordered from latest to earliest, so the earliest eligible
1240+
// item for a full batch of size B is at index (length - B).
1241+
// Do NOT subtract an extra 1 here; when length === B we should still
1242+
// classify the queue as full and probe index 0.
1243+
let targetBlockIdx = queue->Array.length - maxBatchSize
1244+
if targetBlockIdx < 0 {
1245+
false
1246+
} else {
1247+
// Unsafe can fail when maxBatchSize is 0,
1248+
// but we ignore the case
1249+
(queue->Js.Array2.unsafe_get(targetBlockIdx)).blockNumber <=
1250+
latestFullyFetchedBlock.blockNumber
1251+
}
12411252
}
12421253

1243-
(fetchStates: array<t>) => {
1254+
(fetchStates: array<t>, ~maxBatchSize: int) => {
12441255
fetchStates
12451256
->Array.keepU(hasBatchItem)
1246-
->Js.Array2.sortInPlaceWith(compareUnorderedBatchChainPriority)
1257+
->Js.Array2.sortInPlaceWith((a: t, b: t) => {
1258+
switch (a->hasFullBatch(~maxBatchSize), b->hasFullBatch(~maxBatchSize)) {
1259+
| (true, true)
1260+
| (false, false) =>
1261+
// Use unsafe since we filtered out all queues without batch items
1262+
(a.queue->Utils.Array.lastUnsafe).timestamp - (b.queue->Utils.Array.lastUnsafe).timestamp
1263+
| (true, false) => -1
1264+
| (false, true) => 1
1265+
}
1266+
})
12471267
}
12481268
}

codegenerator/cli/src/config_parsing/chain_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ pub enum Network {
194194
Holesky = 17000,
195195

196196
#[subenum(HypersyncNetwork)]
197-
Hyperliquid = 645749,
197+
Hyperliquid = 999,
198198

199199
IncoGentryTestnet = 9090,
200200

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ let createUnorderedBatch = (
185185
let preparedFetchStates =
186186
fetchStates
187187
->ChainMap.values
188-
->FetchState.filterAndSortForUnorderedBatch
188+
->FetchState.filterAndSortForUnorderedBatch(~maxBatchSize)
189189

190190
let idx = ref(0)
191191
let preparedNumber = preparedFetchStates->Array.length

scenarios/test_codegen/test/lib_tests/FetchState_test.res

Lines changed: 115 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,12 +1818,10 @@ describe("FetchState.getNextQuery & integration", () => {
18181818

18191819
describe("FetchState unit tests for specific cases", () => {
18201820
it("Should merge events in correct order on merging", () => {
1821-
let normalSelection: FetchState.selection = {
1822-
dependsOnAddresses: true,
1823-
eventConfigs: [],
1824-
}
1825-
let fetchState: FetchState.t = {
1826-
partitions: [
1821+
let base = makeInitial()
1822+
let normalSelection = base.normalSelection
1823+
let fetchState = base->FetchState.updateInternal(
1824+
~partitions=[
18271825
{
18281826
id: "0",
18291827
status: {fetchingStateId: None},
@@ -1845,32 +1843,15 @@ describe("FetchState unit tests for specific cases", () => {
18451843
addressesByContractName: Js.Dict.empty(),
18461844
},
18471845
],
1848-
nextPartitionIndex: 2,
1849-
isFetchingAtHead: false,
1850-
maxAddrInPartition: 2,
1851-
latestFullyFetchedBlock: {
1852-
blockNumber: 1,
1853-
blockTimestamp: 0,
1854-
},
1855-
queue: [
1846+
~nextPartitionIndex=2,
1847+
~queue=[
18561848
mockEvent(~blockNumber=4, ~logIndex=2),
18571849
mockEvent(~blockNumber=4),
18581850
mockEvent(~blockNumber=3),
18591851
mockEvent(~blockNumber=2),
18601852
mockEvent(~blockNumber=1),
18611853
],
1862-
firstEventBlockNumber: Some(1),
1863-
endBlock: undefined,
1864-
normalSelection,
1865-
chainId,
1866-
indexingContracts: Js.Dict.empty(),
1867-
contractConfigs: Js.Dict.fromArray([
1868-
("Gravatar", {FetchState.filterByAddresses: false}),
1869-
("NftFactory", {FetchState.filterByAddresses: false}),
1870-
]),
1871-
dcsToStore: None,
1872-
blockLag: 0,
1873-
}
1854+
)
18741855

18751856
let updatedFetchState =
18761857
fetchState
@@ -2139,16 +2120,10 @@ describe("FetchState unit tests for specific cases", () => {
21392120

21402121
it("Get earliest event", () => {
21412122
let latestFetchedBlock = getBlockData(~blockNumber=500)
2142-
2143-
let normalSelection: FetchState.selection = {
2144-
dependsOnAddresses: true,
2145-
eventConfigs: [
2146-
(Mock.evmEventConfig(~id="0", ~contractName="ContractA") :> Internal.eventConfig),
2147-
],
2148-
}
2149-
2150-
let fetchState: FetchState.t = {
2151-
partitions: [
2123+
let base = makeInitial()
2124+
let normalSelection = base.normalSelection
2125+
let fetchState = base->FetchState.updateInternal(
2126+
~partitions=[
21522127
{
21532128
id: "0",
21542129
status: {fetchingStateId: None},
@@ -2164,27 +2139,13 @@ describe("FetchState unit tests for specific cases", () => {
21642139
addressesByContractName: Js.Dict.empty(),
21652140
},
21662141
],
2167-
nextPartitionIndex: 2,
2168-
isFetchingAtHead: false,
2169-
maxAddrInPartition: 2,
2170-
latestFullyFetchedBlock: latestFetchedBlock,
2171-
queue: [
2142+
~nextPartitionIndex=2,
2143+
~queue=[
21722144
mockEvent(~blockNumber=6, ~logIndex=1),
21732145
mockEvent(~blockNumber=5),
21742146
mockEvent(~blockNumber=2, ~logIndex=1),
21752147
],
2176-
firstEventBlockNumber: Some(1),
2177-
endBlock: undefined,
2178-
normalSelection,
2179-
chainId,
2180-
indexingContracts: Js.Dict.empty(),
2181-
contractConfigs: Js.Dict.fromArray([
2182-
("Gravatar", {FetchState.filterByAddresses: false}),
2183-
("NftFactory", {FetchState.filterByAddresses: false}),
2184-
]),
2185-
dcsToStore: None,
2186-
blockLag: 0,
2187-
}
2148+
)
21882149

21892150
Assert.deepEqual(
21902151
fetchState->FetchState.getEarliestEvent->getItem,
@@ -2667,7 +2628,55 @@ describe("Test queue item", () => {
26672628
})
26682629

26692630
describe("FetchState.filterAndSortForUnorderedBatch", () => {
2670-
it("Filters out states without eligible items and sorts by earliest timestamp (public API)", () => {
2631+
it(
2632+
"Filters out states without eligible items and sorts by earliest timestamp (public API)",
2633+
() => {
2634+
let mk = () => makeInitial()
2635+
let mkQuery = (fetchState: FetchState.t) => {
2636+
{
2637+
FetchState.partitionId: "0",
2638+
target: Head,
2639+
selection: fetchState.normalSelection,
2640+
addressesByContractName: Js.Dict.empty(),
2641+
fromBlock: 0,
2642+
indexingContracts: fetchState.indexingContracts,
2643+
}
2644+
}
2645+
2646+
// Helper: create a fetch state with desired latestFetchedBlock and queue items via public API
2647+
let makeFsWith = (~latestBlock: int, ~queueBlocks: array<int>): FetchState.t => {
2648+
let fs0 = mk()
2649+
let query = mkQuery(fs0)
2650+
fs0
2651+
->FetchState.handleQueryResult(
2652+
~query,
2653+
~latestFetchedBlock={blockNumber: latestBlock, blockTimestamp: latestBlock},
2654+
~reversedNewItems=queueBlocks->Array.map(b => mockEvent(~blockNumber=b)),
2655+
~currentBlockHeight=latestBlock,
2656+
)
2657+
->Result.getExn
2658+
}
2659+
2660+
// Included: last queue item at block 1, latestFullyFetchedBlock = 10
2661+
let fsEarly = makeFsWith(~latestBlock=10, ~queueBlocks=[2, 1])
2662+
// Included: last queue item at block 5, latestFullyFetchedBlock = 10
2663+
let fsLate = makeFsWith(~latestBlock=10, ~queueBlocks=[5])
2664+
// Excluded: last queue item at block 11 (> latestFullyFetchedBlock = 10)
2665+
let fsExcluded = makeFsWith(~latestBlock=10, ~queueBlocks=[11])
2666+
2667+
let prepared = FetchState.filterAndSortForUnorderedBatch(
2668+
[fsLate, fsExcluded, fsEarly],
2669+
~maxBatchSize=3,
2670+
)
2671+
2672+
Assert.deepEqual(
2673+
prepared->Array.map(fs => (fs.queue->Utils.Array.last->Option.getUnsafe).blockNumber),
2674+
[1, 5],
2675+
)
2676+
},
2677+
)
2678+
2679+
it("Prioritizes full batches over half full ones", () => {
26712680
let mk = () => makeInitial()
26722681
let mkQuery = (fetchState: FetchState.t) => {
26732682
{
@@ -2680,7 +2689,6 @@ describe("FetchState.filterAndSortForUnorderedBatch", () => {
26802689
}
26812690
}
26822691

2683-
// Helper: create a fetch state with desired latestFetchedBlock and queue items via public API
26842692
let makeFsWith = (~latestBlock: int, ~queueBlocks: array<int>): FetchState.t => {
26852693
let fs0 = mk()
26862694
let query = mkQuery(fs0)
@@ -2694,18 +2702,62 @@ describe("FetchState.filterAndSortForUnorderedBatch", () => {
26942702
->Result.getExn
26952703
}
26962704

2697-
// Included: last queue item at block 1, latestFullyFetchedBlock = 10
2698-
let fsEarly = makeFsWith(~latestBlock=10, ~queueBlocks=[2, 1])
2699-
// Included: last queue item at block 5, latestFullyFetchedBlock = 10
2700-
let fsLate = makeFsWith(~latestBlock=10, ~queueBlocks=[5])
2701-
// Excluded: last queue item at block 11 (> latestFullyFetchedBlock = 10)
2702-
let fsExcluded = makeFsWith(~latestBlock=10, ~queueBlocks=[11])
2705+
// Full batch (>= maxBatchSize items). Make it later (earliest item at block 7)
2706+
let fsFullLater = makeFsWith(~latestBlock=10, ~queueBlocks=[9, 8, 7])
2707+
// Half-full batch (1 item) but earlier earliest item (block 1)
2708+
let fsHalfEarlier = makeFsWith(~latestBlock=10, ~queueBlocks=[1])
27032709

2704-
let prepared = [fsLate, fsExcluded, fsEarly]->FetchState.filterAndSortForUnorderedBatch
2710+
let prepared = FetchState.filterAndSortForUnorderedBatch(
2711+
[fsHalfEarlier, fsFullLater],
2712+
~maxBatchSize=2,
2713+
)
2714+
2715+
Assert.deepEqual(
2716+
prepared->Array.map(fs => (fs.queue->Utils.Array.last->Option.getUnsafe).blockNumber),
2717+
[7, 1],
2718+
)
2719+
})
2720+
2721+
it("Treats exactly-full batches as full", () => {
2722+
let mk = () => makeInitial()
2723+
let mkQuery = (fetchState: FetchState.t) => {
2724+
{
2725+
FetchState.partitionId: "0",
2726+
target: Head,
2727+
selection: fetchState.normalSelection,
2728+
addressesByContractName: Js.Dict.empty(),
2729+
fromBlock: 0,
2730+
indexingContracts: fetchState.indexingContracts,
2731+
}
2732+
}
2733+
2734+
let makeFsWith = (~latestBlock: int, ~queueBlocks: array<int>): FetchState.t => {
2735+
let fs0 = mk()
2736+
let query = mkQuery(fs0)
2737+
fs0
2738+
->FetchState.handleQueryResult(
2739+
~query,
2740+
~latestFetchedBlock={blockNumber: latestBlock, blockTimestamp: latestBlock},
2741+
~reversedNewItems=queueBlocks->Array.map(b => mockEvent(~blockNumber=b)),
2742+
~currentBlockHeight=latestBlock,
2743+
)
2744+
->Result.getExn
2745+
}
2746+
2747+
// Exactly full (== maxBatchSize items)
2748+
let fsExactFull = makeFsWith(~latestBlock=10, ~queueBlocks=[3, 2])
2749+
// Half-full (1 item) but earlier earliest item
2750+
let fsHalfEarlier = makeFsWith(~latestBlock=10, ~queueBlocks=[1])
2751+
2752+
let prepared = FetchState.filterAndSortForUnorderedBatch(
2753+
[fsHalfEarlier, fsExactFull],
2754+
~maxBatchSize=2,
2755+
)
27052756

2757+
// Full batch should take priority regardless of earlier timestamp of half batch
27062758
Assert.deepEqual(
27072759
prepared->Array.map(fs => (fs.queue->Utils.Array.last->Option.getUnsafe).blockNumber),
2708-
[1, 5],
2760+
[2, 1],
27092761
)
27102762
})
27112763
})

0 commit comments

Comments
 (0)