@@ -18,9 +18,10 @@ import { AlertType } from "#lib/policies/alerting"
1818import { currentBlockGauge } from "#lib/telemetry/metrics"
1919import { LruCache } from "#lib/utils/LruCache"
2020import { recoverAlert , sendAlert } from "#lib/utils/alert"
21- import { chain , publicClient , rpcUrls , stringify } from "#lib/utils/clients"
2221import { blockLogger } from "#lib/utils/logger"
2322import { Bytes } from "#lib/utils/validation/ark"
23+ import { stringify } from "#lib/utils/viem"
24+ import { chain , publicClient , rpcUrls } from "./clients"
2425
2526/**
2627 * Type of block we get from Viem's `getBlock` — made extra permissive for safety,
@@ -65,8 +66,6 @@ const TIMEOUT_MSG = "Timed out while waiting for block"
6566export class BlockService {
6667 #current?: Block
6768 #previous?: Block
68- #client! : PublicClient
69- #backfillMutex = new Mutex ( )
7069 #callbacks: Set < ( block : Block ) => void | Promise < void > > = new Set ( )
7170
7271 /** Zero-index attempt number for the current client. */
@@ -75,6 +74,9 @@ export class BlockService {
7574 /** Current RPC URL (a value from {@link rpcUrls}) */
7675 #rpcUrl = ""
7776
77+ /** RPC URls besides {@link #rpcUrl} that are live and keeping up with blocks, as of the latest RPC selection. */
78+ #otherLiveRpcUrls: string [ ] = rpcUrls
79+
7880 /** Set of RPCs that failed in the last minute, we will prioritize selecting a RPC not in this set if possible. */
7981 #recentlyFailedRpcs = new Set < string > ( )
8082
@@ -117,6 +119,14 @@ export class BlockService {
117119 }
118120 }
119121
122+ getRpcUrl ( ) : string {
123+ return this . #rpcUrl
124+ }
125+
126+ getOtherLiveRpcUrls ( ) : string [ ] {
127+ return this . #otherLiveRpcUrls
128+ }
129+
120130 getCurrentBlock ( ) : Block {
121131 if ( ! this . #current) throw Error ( "BlockService not initialized!" )
122132 return this . #current
@@ -137,7 +147,7 @@ export class BlockService {
137147 // RPC SELECTION
138148
139149 /**
140- * Select a new RPC service and sets {@link this.#client} to a client for it .
150+ * Select a new RPC service and sets {@link this.#rpcUrl} .
141151 *
142152 * Sketch of the process:
143153 * - Ping all RPCs for latest block to determine who is alive.
@@ -254,7 +264,9 @@ export class BlockService {
254264 }
255265
256266 this . #rpcUrl = rpcUrls [ index ]
257- this . #client = createClient ( this . #rpcUrl)
267+ this . #otherLiveRpcUrls = rpcResults
268+ . map ( ( it , i ) => ( isProgress ( it ) && rpcUrls [ i ] !== this . #rpcUrl ? rpcUrls [ i ] : null ) )
269+ . filter ( ( i ) => i !== null )
258270 this . #attempt = 0
259271
260272 // We got a new block in the whole affair, handle it.
@@ -293,21 +305,21 @@ export class BlockService {
293305 while ( true ) {
294306 // 1. Initialize next client if needed, or wait until next attempt.
295307 init: try {
296- if ( ! this . #client /* very first init */ || skipToNextClient ) {
308+ if ( ! this . #rpcUrl /* very first init */ || skipToNextClient ) {
297309 await this . #nextRPC( )
298310 break init
299311 }
300312
301313 if ( this . #attempt >= maxAttempts ) {
302- blockLogger . warn ( `Max retries (${ maxAttempts } ) reached for ${ this . #client . name } .` )
314+ blockLogger . warn ( `Max retries (${ maxAttempts } ) reached for ${ this . #rpcUrl } .` )
303315 await this . #nextRPC( )
304316 break init
305317 }
306318
307319 if ( this . #attempt > 1 ) {
308320 // We want first retry (attempt = 1) to be instant.
309321 const delay = Math . min ( baseDelay * 2 ** ( this . #attempt - 2 ) , maxDelay )
310- blockLogger . info ( `Waiting ${ delay / 1000 } seconds to retry with ${ this . #client . name } ` )
322+ blockLogger . info ( `Waiting ${ delay / 1000 } seconds to retry with ${ this . #rpcUrl } ` )
311323 await sleep ( delay )
312324 }
313325 } catch ( e ) {
@@ -316,8 +328,8 @@ export class BlockService {
316328 continue
317329 }
318330
319- const client = this . #client . name
320- blockLogger . info ( `Starting block watcher with ${ client } (Attempt ${ this . #attempt + 1 } / ${ maxAttempts } ). ` )
331+ const attemptString = `Attempt ${ this . #attempt + 1 } / ${ maxAttempts } `
332+ blockLogger . info ( `Starting block watcher with ${ this . #rpcUrl } ( ${ attemptString } ) ` )
321333
322334 // 2. Setup subscription
323335 const { promise, reject } = promiseWithResolvers < void > ( )
@@ -331,9 +343,9 @@ export class BlockService {
331343
332344 this . #startBlockTimeout( )
333345
334- if ( this . #client . transport . type === "webSocket" ) {
346+ if ( publicClient . transport . type === "webSocket" ) {
335347 try {
336- ; ( { unsubscribe } = await this . #client . transport . subscribe ( {
348+ ; ( { unsubscribe } = await publicClient . transport . subscribe ( {
337349 params : [ "newHeads" ] ,
338350 // Type is unchecked, we're being conservative with what we receive.
339351 onData : ( data ?: { result ?: Partial < RpcBlock > } ) => {
@@ -358,7 +370,7 @@ export class BlockService {
358370 } else {
359371 pollingTimer = setInterval ( async ( ) => {
360372 // biome-ignore format: terse
361- try { void this . #handleNewBlock( await this . #client . getBlock ( { includeTransactions : false } ) ) }
373+ try { void this . #handleNewBlock( await publicClient . getBlock ( { includeTransactions : false } ) ) }
362374 catch ( e ) { reject ( e ) }
363375 } , pollingInterval )
364376 }
@@ -367,7 +379,7 @@ export class BlockService {
367379 // it because we need to control the retry logic ourselves to implement RPC fallback
368380 // for subscriptions, as well as resubscriptions — two things Viem doesn't handle.
369381
370- // unwatch = this.#client .watchBlocks({
382+ // unwatch = publicClient .watchBlocks({
371383 // pollingInterval,
372384 // includeTransactions: false,
373385 // emitOnBegin: false,
@@ -384,8 +396,8 @@ export class BlockService {
384396 clearTimeout ( this . blockTimeout )
385397 // This happens more than the rest, and if the timeout persist, there will be plenty of other logs
386398 // for us to notice as the RPC will rotate.
387- if ( e === TIMEOUT_MSG ) blockLogger . info ( TIMEOUT_MSG , client )
388- else blockLogger . error ( "Block watcher error" , client , stringify ( e ) )
399+ if ( e === TIMEOUT_MSG ) blockLogger . info ( TIMEOUT_MSG , this . #rpcUrl )
400+ else blockLogger . error ( "Block watcher error" , this . #rpcUrl , stringify ( e ) )
389401 ++ this . #attempt
390402 }
391403 }
@@ -436,7 +448,7 @@ export class BlockService {
436448 for ( let i = 1 ; i <= 3 ; ++ i ) {
437449 try {
438450 // `includeTransactions: false` still gives us a list of transaction hashes
439- block = await this . #client . getBlock ( { blockNumber, includeTransactions : false } )
451+ block = await publicClient . getBlock ( { blockNumber, includeTransactions : false } )
440452 break
441453 } catch {
442454 await sleep ( env . LINEAR_RETRY_DELAY * i )
@@ -462,7 +474,7 @@ export class BlockService {
462474 // Check for duplicates
463475 if ( block . hash === this . #current?. hash ) {
464476 // Don't warn when polling, since this is expected to happen all the time.
465- if ( this . #client . transport . type === "webSocket" )
477+ if ( publicClient . transport . type === "webSocket" )
466478 blockLogger . warn ( `Received duplicate block ${ block . number } , skipping.` )
467479 return false
468480 }
@@ -507,7 +519,7 @@ export class BlockService {
507519 }
508520
509521 if ( this . #attempt > 0 ) {
510- blockLogger . info ( `Retrieved block ${ block . number } with ${ this . #client . name } . Resetting attempt count.` )
522+ blockLogger . info ( `Retrieved block ${ block . number } with ${ this . #rpcUrl } . Resetting attempt count.` )
511523 this . #attempt = 0
512524 }
513525
@@ -532,6 +544,8 @@ export class BlockService {
532544 return true
533545 }
534546
547+ #backfillMutex = new Mutex ( )
548+
535549 /**
536550 * Backfills blocks with numbers in [from, to] (inclusive).
537551 * Returns true iff the backfill is successful for the entire range.
@@ -551,7 +565,7 @@ export class BlockService {
551565
552566 const promises = [ ]
553567 for ( let blockNumber = from ; blockNumber <= to ; blockNumber ++ ) {
554- promises . push ( this . #client . getBlock ( { blockNumber, includeTransactions : false } ) )
568+ promises . push ( publicClient . getBlock ( { blockNumber, includeTransactions : false } ) )
555569 }
556570
557571 for ( let blockNumber = from ; blockNumber <= to ; blockNumber ++ ) {
0 commit comments