Skip to content
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -691,21 +691,34 @@ select-docs-prod:
make select-docs url=https://docs.happy.tech posthog=true
.PHONY: select-docs-prod

# Use the local setup (chain & all services to run locally).
select-all-local: select-chain-local select-submitter-local select-iframe-local select-auth-local select-docs-local
.PHONY: select-all-local

# Use the staging setup — this is useful as a base for other rules & for scripts.
select-all-staging: select-chain-staging select-submitter-staging select-iframe-staging select-auth-staging select-docs-staging
.PHONY: select-all-staging

# Use the prod setup — this is useful for bundling/publishing packages, and as a base for other rules & for scripts.
select-all-prod: select-chain-prod select-submitter-prod select-iframe-prod select-auth-prod select-docs-prod
.PHONY: select-all-prod

# Use the staging setup, but run the iframe locally.
select-iframe-dev-staging: select-all-staging select-iframe-local
.PHONY: select-iframe-dev-staging

# Use the prod setup, but run the iframe locally.
select-iframe-dev-prod: select-all-prod select-iframe-local
.PHONY: select-iframe-dev-prod

# Use the staging setup, but run the iframe and the submitter locally.
select-submitter-dev-staging: select-iframe-dev-staging select-submitter-staging
.PHONY: select-submitter-dev-staging

# Use the prod setup, but run the iframe and submitter locally.
select-submitter-dev-prod: select-iframe-dev-prod select-submitter-prod
.PHONY: select-submitter-dev-prod

setup-local-chain: select-chain-local
@cd contracts && make anvil-background
@until cast chain-id; do sleep 1; done;
Expand Down
1 change: 1 addition & 0 deletions apps/submitter/build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export default defineConfig([
exports: [".", "./migrate"],
bunConfig: {
target: "bun",
minify: false,
},
},
{
Expand Down
84 changes: 54 additions & 30 deletions apps/submitter/lib/services/BlockService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ const checkBlock = type({
*/
export type Block = typeof checkBlock.infer

const TIMEOUT_MSG = "Timed out while waiting for block"

// Note there is an extra block type: `RpcBlock` from Viem representing an raw block from RPC, which we get on
// our WebSocket subscription. We normalize those to `InputBlock`.

const TIMEOUT_MSG = "Timed out while waiting for block"

/**
* This service is represonsible for fetching block information (via subscription or polling depending on the RPC),
* and allows other services to subscribe to blocks via {@link #onBlock}.
*/
export class BlockService {
#current?: Block
#previous?: Block
Expand All @@ -83,12 +87,15 @@ export class BlockService {
/** Call this to unwind the current block subscription and skip to the next client. */
#skipToNextClient!: RejectType

/** Timeout for receiving a block. Private so it can be disabled in tests. */
/** Timeout for receiving a block. Uses the `private` keyword so it can be disabled in tests. */
private blockTimeout: Timer | undefined = undefined

// =================================================================================================================
// PUBLIC METHODS

/**
* Blocks until the block service is properly initialized (after calling {@link #start}.
*/
async waitForInitialization(): Promise<void> {
if (this.#current) return

Expand All @@ -115,7 +122,11 @@ export class BlockService {
return this.#current
}

/** Register a callback on the current block. */
/**
* Register a callback to be invoked on new blocks.
* The callback is immediately invoked on the current block, unless {@link skipInitial} is provided.
* Returns an unsubscribe function.
*/
onBlock(callback: (block: Block) => void, skipInitial?: "skipInitial"): () => void {
this.#callbacks.add(callback)
if (!skipInitial && this.#current) callback(this.#current)
Expand All @@ -136,7 +147,11 @@ export class BlockService {
* - If there are none, we drop the "no recent failures" requirement.
*/
async #nextRPC(): Promise<void> {
// === Helper function ===
// Current block = most up to date block known.
// This might get reset to a block with a lower number if we detect a re-org.
let current = this.#current ?? { number: 0n, hash: "0x" as Hash }

// === Helper functions ===

function createClient(url: string, timeout = env.RPC_REQUEST_TIMEOUT): PublicClient {
const isWs = url.startsWith("ws")
Expand All @@ -147,24 +162,34 @@ export class BlockService {
/** Get latest block from all RPCs. */
async function pingRpcsForBlock(timeout: number): Promise<PromiseSettledResult<InputBlock>[]> {
const promises = rpcUrls.map((url) => createClient(url, timeout).getBlock({ includeTransactions: false }))
// Note that if a WebSocket RPC is down, some exceptions can escape to the top-level here.
// Nothing we can do about it, Viem doesn't catch them. It's benign however.
// Note that if a WebSocket RPC is down, some exceptions can escape to the console here.
// Nothing we can do about it, Viem doesn't catch them and we can't either (they're in promises), but
// it's benign as they are simply logged and do not terminate the process.
return await Promise.allSettled(promises)
}

/** Can be applied to {@link pingRpcsForBlock} result to select successful calls. */
/** Can be applied to items of the {@link pingRpcsForBlock} result to select successful calls. */
function isSuccess(result: PromiseSettledResult<InputBlock>): result is PromiseFulfilledResult<Block> {
if (result.status !== "fulfilled") return false
return !(checkBlock(result.value) instanceof ArkErrors)
}

/**
* Can be applied to items of the {@link pingRpcsForBlock} result to select calls that are successful
* and yield a block whose number exceed the current block.
*/
function isProgress(result: PromiseSettledResult<InputBlock>): result is PromiseFulfilledResult<Block> {
return isSuccess(result) && result.value.number > current.number
}

// === Find live RPCs ===

if (this.#rpcUrl) {
// We had a RPC but we're entering RPC selection, meaning the RPC failed.
this.#recentlyFailedRpcs.add(this.#rpcUrl)
// Don't need to clear. The worse that can happen is it will be removed a bit early
// No need to clear the timeout. The worse that can happen is it will be removed a bit early
// if re-added after the failed set was cleared following a block production stall.
// Extremely rare scenario that doesn't break anything. Not worth the extra bookkeeping.
// Extremely rare scenario that doesn't jeopardize correctness. Not worth the extra bookkeeping.
setTimeout(() => this.#recentlyFailedRpcs.delete(this.#rpcUrl), env.RPC_TIMED_OUT_PERIOD)
}

Expand All @@ -187,15 +212,13 @@ export class BlockService {

// === Check to see if block production has halted ===

// Check `this.#client` to avoid waiting & logging an error on initial RPC selection.
const halted = !rpcResults.some((it) => isSuccess(it) && it.value.number > (this.#current?.number ?? 0n))
if (this.#client && halted) {
const halted = !rpcResults.some(isProgress)
if (halted) {
// This might trigger at the start of testing and is benign, it just means the RPC isn't spun up yet.
const message = "Block production has halted, waiting for it to resume."
blockLogger.error(message)
sendAlert(message, AlertType.BLOCK_PRODUCTION_HALTED)
const { promise, resolve } = promiseWithResolvers()
let current = this.#current ?? { number: 0n, hash: "0x" as Hash }
const pollingTimer = setInterval(async () => {
rpcResults = await pingRpcsForBlock(env.RPC_REQUEST_TIMEOUT)
// The block amongst the result with the higher number.
Expand All @@ -209,7 +232,8 @@ export class BlockService {
localMax = r.value
}
}
if (localMax.number && this.#blockHistory.get(localMax.number) !== localMax.hash) {
const oldBlockHash = this.#blockHistory.get(localMax.number)
if (oldBlockHash && oldBlockHash !== localMax.hash) {
// A re-org might have occured — reset block number and check for forward movement from there.
current = localMax
}
Expand All @@ -222,11 +246,11 @@ export class BlockService {

// === Select RPC ===

// Get most prioritary alive RPC, excluding recently failed ones.
let index = rpcResults.findIndex((it, i) => isSuccess(it) && !this.#recentlyFailedRpcs.has(rpcUrls[i]))
// Get most prioritary alive RPC that made progress, excluding recently failed ones.
let index = rpcResults.findIndex((it, i) => isProgress(it) && !this.#recentlyFailedRpcs.has(rpcUrls[i]))
if (index < 0) {
blockLogger.error("Every alive RPC has failed within the last minute, but some RPCs are live.")
index = rpcResults.findIndex(isSuccess) // we know this must be > 0
index = rpcResults.findIndex(isProgress) // we know this must be > 0
}

this.#rpcUrl = rpcUrls[index]
Expand All @@ -236,14 +260,14 @@ export class BlockService {
// We got a new block in the whole affair, handle it.
// This is always ok: this is either a more recent block or a re-org occured.
const newBlock = (rpcResults[index] as PromiseFulfilledResult<Block>).value
if (!this.#current || this.#current.number < newBlock.number) this.#handleNewBlock(newBlock)
this.#handleNewBlock(newBlock)
}

// Note that in the case of re-orgs, we will be missing blocks compared to the "most re-orged"
// block that we saw. We should handle that but don't sweat too much about it right now. We
// haven't really audited the code for re-orgs. In theory, the Viem nonce manager should handle
// EVM tx nonces. Boop nonces get a resync via `InvalidNonce`, and the nonce cache expires fast
// anyway. We might suffer from stuck transactions in the future (meant for pre-re-org chain).
// Note that in the case of re-orgs, we might be missing blocks compared to the "most re-orged" block that we saw.
// We should handle that but don't sweat too much about it right now. We haven't really audited the code for
// re-orgs. Our nonce manager should handle EVM tx nonces. Boop nonces get a resync via `InvalidNonce`, and the
// nonce cache expires fast anyway. We might suffer from stuck transactions in the future (meant for pre-re-org
// chain).
//
// The most immediate to-do item is to call out the resync system to cancel submit and
// createAccount transactions, which we need for robustness.
Expand All @@ -255,7 +279,7 @@ export class BlockService {
// BLOCK MONITORING

async start(): Promise<void> {
// Generic logic first retry is instant, then `initialRetryDelay` with exponential backoff up
// Generic logic: first retry is instant, then `initialRetryDelay` with exponential backoff up
// to `maxRetryDelay` with a max of `maxAttempts` attempts.
// However, default values only retries once without delay, otherwise we'd rather move on to another RPC than
// waste time waiting.
Expand Down Expand Up @@ -399,12 +423,12 @@ export class BlockService {
})
if (txHashes.length > 0) {
rpcBlock.transactions = txHashes
// Note that rpcBlock cannot throw if its input is and object.
// Note that #handleNewBlock cannot throw if its input is and object.
// cast: allows `number` to be null
await this.#handleNewBlock(formatBlock(rpcBlock) as Block)
return
}
// It may be that the block simplify has no transactions, but out of an abundance of caution, we'll try
// It may be that the block simply has no transactions, but out of an abundance of caution, we'll try
// fetching the block anyway. If there's truly no transactions here, there will be no harm.
}

Expand Down Expand Up @@ -520,7 +544,7 @@ export class BlockService {
// Use a Mutex to avoid backfilling the same range many times.
return this.#backfillMutex.locked(async () => {
// It's possible all or part of the range was backfilled while we were waiting.
if (this.#current?.number ?? 0n > from) from = this.#current!.number
if ((this.#current?.number ?? 0n) > from) from = this.#current!.number
if (from >= to) return true

blockLogger.info(`Backfilling blocks in [${from}, ${to}] (inclusive).`)
Expand All @@ -535,9 +559,9 @@ export class BlockService {
// Filled in meanwhile, move forward.
if (this.#current?.number ?? 0n > blockNumber) continue
const block = await promises[Number(blockNumber - from)]
if (!block) throw "Block was undefined" // this shouldn't happen
if (!block) throw Error("Block was undefined") // this shouldn't happen
// Disallow recursive backfills. Should never happen, but theoretically possible with reorgs.
if (!(await this.#handleNewBlock(block, false))) throw "Block was skipped."
if (!(await this.#handleNewBlock(block, false))) throw Error("Block was skipped.")
} catch (e) {
// If the block was filled meanwhile, we can move forward.
if (this.#current?.number ?? 0n > blockNumber) continue
Expand Down