Skip to content

Commit 75a502f

Browse files
authored
2x faster historical sync with RPC source (#726)
* Merge event_sync_state into envio_chains * Track progressed chains * Commit block progress to the db * Fixes * Fix tests * Add resume log for reorg debugging * Improve error message on HyperSync getEvents validation * Move RPC source to npm package * Optimize RPC get_logs block intervals - 2x speed up for some providers * Prevent increasing block interval after hard max for RPC source
1 parent f5856bb commit 75a502f

File tree

8 files changed

+144
-62
lines changed

8 files changed

+144
-62
lines changed

codegenerator/cli/npm/envio/src/InternalConfig.res

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,13 @@ type chain = {
1818
contracts: array<contract>,
1919
sources: array<Source.t>,
2020
}
21+
22+
type sourceSync = {
23+
initialBlockInterval: int,
24+
backoffMultiplicative: float,
25+
accelerationAdditive: int,
26+
intervalCeiling: int,
27+
backoffMillis: int,
28+
queryTimeoutMillis: int,
29+
fallbackStallTimeout: int,
30+
}
File renamed without changes.

codegenerator/cli/npm/envio/src/sources/HyperSync.res

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,23 @@ module GetLogs = {
7676
fieldSelection,
7777
}
7878

79+
@inline
7980
let addMissingParams = (acc, fieldNames, returnedObj, ~prefix) => {
80-
fieldNames->Array.forEach(fieldName => {
81-
switch returnedObj
82-
->(Utils.magic: 'a => Js.Dict.t<unknown>)
83-
->Utils.Dict.dangerouslyGetNonOption(fieldName) {
84-
| Some(_) => ()
85-
| None => acc->Array.push(prefix ++ "." ++ fieldName)->ignore
81+
if fieldNames->Utils.Array.notEmpty {
82+
if !(returnedObj->Obj.magic) {
83+
acc->Array.push(prefix)->ignore
84+
} else {
85+
for idx in 0 to fieldNames->Array.length - 1 {
86+
let fieldName = fieldNames->Array.getUnsafe(idx)
87+
switch returnedObj
88+
->(Utils.magic: 'a => Js.Dict.t<unknown>)
89+
->Utils.Dict.dangerouslyGetNonOption(fieldName) {
90+
| Some(_) => ()
91+
| None => acc->Array.push(prefix ++ "." ++ fieldName)->ignore
92+
}
93+
}
8694
}
87-
})
95+
}
8896
}
8997

9098
//Note this function can throw an error
Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ let getKnownBlock = (provider, blockNumber) =>
1616
}
1717
)
1818

19-
let rec getKnownBlockWithBackoff = async (~provider, ~sourceName, ~chain, ~blockNumber, ~backoffMsOnFailure) =>
19+
let rec getKnownBlockWithBackoff = async (
20+
~provider,
21+
~sourceName,
22+
~chain,
23+
~blockNumber,
24+
~backoffMsOnFailure,
25+
) =>
2026
switch await getKnownBlock(provider, blockNumber) {
2127
| exception err =>
2228
Logging.warn({
@@ -88,19 +94,24 @@ let getSuggestedBlockIntervalFromExn = {
8894
// - Optimism: "backend response too large" or "Block range is too large"
8995
// - Arbitrum: "logs matched by query exceeds limit of 10000"
9096

91-
exn =>
97+
(exn): option<(
98+
// The suggested block range
99+
int,
100+
// Whether it's the max range that the provider allows
101+
bool,
102+
)> =>
92103
switch exn {
93104
| Js.Exn.Error(error) =>
94105
try {
95106
let message: string = (error->Obj.magic)["error"]["message"]
96107
message->S.assertOrThrow(S.string)
97108

98109
// Helper to extract block range from regex match
99-
let extractBlockRange = execResult =>
110+
let extractBlockRange = (execResult, ~isMaxRange) =>
100111
switch execResult->Js.Re.captures {
101112
| [_, Js.Nullable.Value(blockRangeLimit)] =>
102113
switch blockRangeLimit->Int.fromString {
103-
| Some(blockRangeLimit) if blockRangeLimit > 0 => Some(blockRangeLimit)
114+
| Some(blockRangeLimit) if blockRangeLimit > 0 => Some(blockRangeLimit, isMaxRange)
104115
| _ => None
105116
}
106117
| _ => None
@@ -113,48 +124,49 @@ let getSuggestedBlockIntervalFromExn = {
113124
| [_, Js.Nullable.Value(fromBlock), Js.Nullable.Value(toBlock)] =>
114125
switch (fromBlock->Int.fromString, toBlock->Int.fromString) {
115126
| (Some(fromBlock), Some(toBlock)) if toBlock >= fromBlock =>
116-
Some(toBlock - fromBlock + 1)
127+
Some(toBlock - fromBlock + 1, false)
117128
| _ => None
118129
}
119130
| _ => None
120131
}
121132
| None =>
122133
// Try each provider's specific error pattern
123134
switch blockRangeLimitRegExp->Js.Re.exec_(message) {
124-
| Some(execResult) => extractBlockRange(execResult)
135+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
125136
| None =>
126137
switch alchemyRangeRegExp->Js.Re.exec_(message) {
127-
| Some(execResult) => extractBlockRange(execResult)
138+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
128139
| None =>
129140
switch cloudflareRangeRegExp->Js.Re.exec_(message) {
130-
| Some(execResult) => extractBlockRange(execResult)
141+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
131142
| None =>
132143
switch thirdwebRangeRegExp->Js.Re.exec_(message) {
133-
| Some(execResult) => extractBlockRange(execResult)
144+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
134145
| None =>
135146
switch blockpiRangeRegExp->Js.Re.exec_(message) {
136-
| Some(execResult) => extractBlockRange(execResult)
147+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
137148
| None =>
138149
switch maxAllowedBlocksRegExp->Js.Re.exec_(message) {
139-
| Some(execResult) => extractBlockRange(execResult)
150+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
140151
| None =>
141152
switch baseRangeRegExp->Js.Re.exec_(message) {
142-
| Some(_) => Some(2000)
153+
| Some(_) => Some(2000, true)
143154
| None =>
144-
switch blastPaidRegExp->Js.Re.exec_(message) {
145-
| Some(execResult) => extractBlockRange(execResult)
155+
switch blastPaidRegExp->Js.Re.exec_(message) {
156+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
146157
| None =>
147158
switch chainstackRegExp->Js.Re.exec_(message) {
148-
| Some(_) => Some(10000)
159+
| Some(_) => Some(10000, true)
149160
| None =>
150161
switch coinbaseRegExp->Js.Re.exec_(message) {
151-
| Some(execResult) => extractBlockRange(execResult)
162+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
152163
| None =>
153164
switch publicNodeRegExp->Js.Re.exec_(message) {
154-
| Some(execResult) => extractBlockRange(execResult)
165+
| Some(execResult) => extractBlockRange(execResult, ~isMaxRange=true)
155166
| None =>
156167
switch hyperliquidRegExp->Js.Re.exec_(message) {
157-
| Some(execResult) => extractBlockRange(execResult)
168+
| Some(execResult) =>
169+
extractBlockRange(execResult, ~isMaxRange=true)
158170
| None => None
159171
}
160172
}
@@ -181,15 +193,17 @@ type eventBatchQuery = {
181193
latestFetchedBlock: Ethers.JsonRpcProvider.block,
182194
}
183195

196+
let maxSuggestedBlockIntervalKey = "max"
197+
184198
let getNextPage = (
185199
~fromBlock,
186200
~toBlock,
187201
~addresses,
188202
~topicQuery,
189203
~loadBlock,
190-
~syncConfig as sc: Config.syncConfig,
204+
~syncConfig as sc: InternalConfig.sourceSync,
191205
~provider,
192-
~suggestedBlockIntervals,
206+
~mutSuggestedBlockIntervals,
193207
~partitionId,
194208
): promise<eventBatchQuery> => {
195209
//If the query hangs for longer than this, reject this promise to reduce the block interval
@@ -224,8 +238,11 @@ let getNextPage = (
224238
->Promise.race
225239
->Promise.catch(err => {
226240
switch getSuggestedBlockIntervalFromExn(err) {
227-
| Some(nextBlockIntervalTry) =>
228-
suggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry)
241+
| Some((nextBlockIntervalTry, isMaxRange)) =>
242+
mutSuggestedBlockIntervals->Js.Dict.set(
243+
isMaxRange ? maxSuggestedBlockIntervalKey : partitionId,
244+
nextBlockIntervalTry,
245+
)
229246
raise(
230247
Source.GetItemsError(
231248
FailedGettingItems({
@@ -241,7 +258,7 @@ let getNextPage = (
241258
let executedBlockInterval = toBlock - fromBlock + 1
242259
let nextBlockIntervalTry =
243260
(executedBlockInterval->Belt.Int.toFloat *. sc.backoffMultiplicative)->Belt.Int.fromFloat
244-
suggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry)
261+
mutSuggestedBlockIntervals->Js.Dict.set(partitionId, nextBlockIntervalTry)
245262
raise(
246263
Source.GetItemsError(
247264
Source.FailedGettingItems({
@@ -351,11 +368,8 @@ let memoGetSelectionConfig = (~chain) => {
351368
}
352369

353370
let makeThrowingGetEventBlock = (~getBlock) => {
354-
// The block fields type is a subset of Ethers.JsonRpcProvider.block so we can safely cast
355-
let blockFieldsFromBlock: Ethers.JsonRpcProvider.block => Internal.eventBlock = Utils.magic
356-
357-
async (log: Ethers.log): Internal.eventBlock => {
358-
(await getBlock(log.blockNumber))->blockFieldsFromBlock
371+
async (log: Ethers.log) => {
372+
await getBlock(log.blockNumber)
359373
}
360374
}
361375

@@ -434,7 +448,7 @@ let sanitizeUrl = (url: string) => {
434448

435449
type options = {
436450
sourceFor: Source.sourceFor,
437-
syncConfig: Config.syncConfig,
451+
syncConfig: InternalConfig.sourceSync,
438452
url: string,
439453
chain: ChainMap.Chain.t,
440454
contracts: array<Internal.evmContractConfig>,
@@ -455,7 +469,7 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options
455469

456470
let getSelectionConfig = memoGetSelectionConfig(~chain)
457471

458-
let suggestedBlockIntervals = Js.Dict.empty()
472+
let mutSuggestedBlockIntervals = Js.Dict.empty()
459473

460474
let transactionLoader = LazyLoader.make(
461475
~loaderFn=transactionHash => provider->Ethers.JsonRpcProvider.getTransaction(~transactionHash),
@@ -478,7 +492,13 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options
478492

479493
let blockLoader = LazyLoader.make(
480494
~loaderFn=blockNumber =>
481-
getKnownBlockWithBackoff(~provider, ~sourceName=name, ~chain, ~backoffMsOnFailure=1000, ~blockNumber),
495+
getKnownBlockWithBackoff(
496+
~provider,
497+
~sourceName=name,
498+
~chain,
499+
~backoffMsOnFailure=1000,
500+
~blockNumber,
501+
),
482502
~onError=(am, ~exn) => {
483503
Logging.error({
484504
"err": exn,
@@ -523,10 +543,15 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options
523543
) => {
524544
let startFetchingBatchTimeRef = Hrtime.makeTimer()
525545

526-
let suggestedBlockInterval =
527-
suggestedBlockIntervals
546+
let suggestedBlockInterval = switch mutSuggestedBlockIntervals->Utils.Dict.dangerouslyGetNonOption(
547+
maxSuggestedBlockIntervalKey,
548+
) {
549+
| Some(maxSuggestedBlockInterval) => maxSuggestedBlockInterval
550+
| None =>
551+
mutSuggestedBlockIntervals
528552
->Utils.Dict.dangerouslyGetNonOption(partitionId)
529553
->Belt.Option.getWithDefault(syncConfig.initialBlockInterval)
554+
}
530555

531556
// Always have a toBlock for an RPC worker
532557
let toBlock = switch toBlock {
@@ -554,18 +579,22 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options
554579
~loadBlock=blockNumber => blockLoader->LazyLoader.get(blockNumber),
555580
~syncConfig,
556581
~provider,
557-
~suggestedBlockIntervals,
582+
~mutSuggestedBlockIntervals,
558583
~partitionId,
559584
)
560585

561586
let executedBlockInterval = suggestedToBlock - fromBlock + 1
562587

563588
// Increase the suggested block interval only when it was actually applied
564589
// and we didn't query to a hard toBlock
565-
if executedBlockInterval >= suggestedBlockInterval {
590+
// We also don't care about it when we have a hard max block interval
591+
if (
592+
executedBlockInterval >= suggestedBlockInterval &&
593+
!(mutSuggestedBlockIntervals->Utils.Dict.has(maxSuggestedBlockIntervalKey))
594+
) {
566595
// Increase batch size going forward, but do not increase past a configured maximum
567596
// See: https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
568-
suggestedBlockIntervals->Js.Dict.set(
597+
mutSuggestedBlockIntervals->Js.Dict.set(
569598
partitionId,
570599
Pervasives.min(
571600
executedBlockInterval + syncConfig.accelerationAdditive,
@@ -634,15 +663,19 @@ let make = ({sourceFor, syncConfig, url, chain, contracts, eventRouter}: options
634663
(
635664
{
636665
eventConfig: (eventConfig :> Internal.eventConfig),
637-
timestamp: block->Types.Block.getTimestamp,
666+
timestamp: block.timestamp,
667+
blockNumber: block.number,
638668
chain,
639-
blockNumber: block->Types.Block.getNumber,
640669
logIndex: log.logIndex,
641670
event: {
642671
chainId: chain->ChainMap.Chain.toChainId,
643672
params: decodedEvent.args,
644673
transaction,
645-
block,
674+
// Unreliably expect that the Ethers block fields match the types in HyperIndex
675+
// I assume this is wrong in some cases, so we need to fix it in the future
676+
block: block->(
677+
Utils.magic: Ethers.JsonRpcProvider.block => Internal.eventBlock
678+
),
646679
srcAddress: log.address,
647680
logIndex: log.logIndex,
648681
}->Internal.fromGenericEvent,

codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ type hyperFuelConfig = {endpointUrl: string}
44

55
@genType.opaque
66
type rpcConfig = {
7-
syncConfig: Config.syncConfig,
7+
syncConfig: InternalConfig.sourceSync,
88
}
99

1010
@genType

codegenerator/cli/templates/static/codegen/src/Config.res

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,6 @@ type syncConfigOptions = {
1212
fallbackStallTimeout?: int,
1313
}
1414

15-
type syncConfig = {
16-
initialBlockInterval: int,
17-
backoffMultiplicative: float,
18-
accelerationAdditive: int,
19-
intervalCeiling: int,
20-
backoffMillis: int,
21-
queryTimeoutMillis: int,
22-
fallbackStallTimeout: int,
23-
}
24-
2515
type historyFlag = FullHistory | MinHistory
2616
type rollbackFlag = RollbackOnReorg | NoRollback
2717
type historyConfig = {rollbackFlag: rollbackFlag, historyFlag: historyFlag}
@@ -36,7 +26,7 @@ let getSyncConfig = (
3626
?queryTimeoutMillis,
3727
?fallbackStallTimeout,
3828
}: syncConfigOptions,
39-
): syncConfig => {
29+
): InternalConfig.sourceSync => {
4030
let queryTimeoutMillis = queryTimeoutMillis->Option.getWithDefault(20_000)
4131
{
4232
initialBlockInterval: Env.Configurable.SyncConfig.initialBlockInterval->Option.getWithDefault(
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
open RescriptMocha
2+
3+
let testApiToken = "3dc856dd-b0ea-494f-b27e-017b8b6b7e07"
4+
5+
describe_skip("Test Hyperliquid broken transaction response", () => {
6+
Async.it("should handle broken transaction response", async () => {
7+
let page = await HyperSync.GetLogs.query(
8+
~client=HyperSyncClient.make(
9+
~url="https://645749.hypersync.xyz",
10+
~apiToken=testApiToken,
11+
~maxNumRetries=Env.hyperSyncClientMaxRetries,
12+
~httpReqTimeoutMillis=Env.hyperSyncClientTimeoutMillis,
13+
),
14+
~fromBlock=12403138,
15+
~toBlock=Some(12403139),
16+
~logSelections=[
17+
{
18+
addresses: [],
19+
topicSelections: [
20+
{
21+
topic0: [
22+
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"->EvmTypes.Hex.fromStringUnsafe,
23+
],
24+
topic1: [],
25+
topic2: [],
26+
topic3: [],
27+
},
28+
],
29+
},
30+
],
31+
~fieldSelection={
32+
log: [Address, Data, LogIndex, Topic0, Topic1, Topic2, Topic3],
33+
transaction: [Hash],
34+
},
35+
~nonOptionalBlockFieldNames=[],
36+
~nonOptionalTransactionFieldNames=["hash"],
37+
)
38+
39+
Js.log(page)
40+
})
41+
})

0 commit comments

Comments
 (0)