Skip to content

Commit d927b5d

Browse files
authored
fix: audit round 3 (#284)
* fix(GRETH-037): replace type-erased event bus with typed OnceLock - Replace OnceCell<Box<dyn Any>> with OnceLock<PipeExecLayerEventBus<EthPrimitives>> - Remove generic parameter from get_pipe_exec_layer_event_bus() - Add 120s timeout with panic on expiry (GRETH-038 combined fix) - Add TypeId assertion in tree/mod.rs pipe_run_inner as runtime safety guard - Use transmute to bridge Receiver<PipeExecLayerEvent<EthPrimitives>> to generic N - Update all call sites to remove turbofish generic parameter * Remove the duplicate `new_system_call_txn()` in `metadata_txn.rs` * add design-intent comments for epoch/execute_height atomic ordering
1 parent 2ef6731 commit d927b5d

9 files changed

Lines changed: 73 additions & 50 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/engine/tree/src/tree/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,8 +558,21 @@ where
558558
}
559559

560560
fn pipe_run_inner(mut self) {
561+
// Safety guard: assert N == EthPrimitives at runtime to prevent silent UB
562+
// from the transmute below. This is feasible because NodePrimitives has a
563+
// 'static bound, and keeps the reth upstream generic signature chain intact.
564+
assert_eq!(
565+
std::any::TypeId::of::<N>(),
566+
std::any::TypeId::of::<reth_ethereum_primitives::EthPrimitives>(),
567+
"pipe_run_inner requires N = EthPrimitives"
568+
);
561569
let pipe_event_rx =
562-
get_pipe_exec_layer_event_bus::<N>().event_rx.lock().unwrap().take().unwrap();
570+
get_pipe_exec_layer_event_bus().event_rx.lock().unwrap().take().unwrap();
571+
// Safety: The TypeId assertion above guarantees N == EthPrimitives,
572+
// so Receiver<PipeExecLayerEvent<EthPrimitives>> and Receiver<PipeExecLayerEvent<N>>
573+
// are the same type at runtime.
574+
let pipe_event_rx: std::sync::mpsc::Receiver<PipeExecLayerEvent<N>> =
575+
unsafe { std::mem::transmute(pipe_event_rx) };
563576
loop {
564577
match self.try_recv_pipe_exec_event(&pipe_event_rx) {
565578
Ok(Some(event)) => self.on_pipe_exec_event(event),

crates/pipe-exec-layer-ext-v2/event-bus/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ workspace = true
1313

1414
[dependencies]
1515
reth-primitives.workspace = true
16+
reth-ethereum-primitives.workspace = true
1617
reth-chain-state.workspace = true
1718
alloy-primitives.workspace = true
18-
once_cell.workspace = true
1919
tokio.workspace = true
2020
tracing.workspace = true

crates/pipe-exec-layer-ext-v2/event-bus/src/lib.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
11
//! Event bus for the pipe execution layer.
22
33
use alloy_primitives::TxHash;
4-
use once_cell::sync::OnceCell;
54
use reth_chain_state::ExecutedBlockWithTrieUpdates;
5+
use reth_ethereum_primitives::EthPrimitives;
66
use reth_primitives::NodePrimitives;
7-
use std::{any::Any, thread::sleep, time::Duration};
7+
use std::{sync::OnceLock, thread::sleep, time::Duration};
88
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
99
use tracing::info;
1010

1111
/// A static instance of `PipeExecLayerEventBus` used for dispatching events.
12-
pub static PIPE_EXEC_LAYER_EVENT_BUS: OnceCell<Box<dyn Any + Send + Sync>> = OnceCell::new();
12+
/// Uses typed `OnceLock` instead of `Box<dyn Any>` to eliminate the runtime downcast
13+
/// and make type mismatches a compile-time error.
14+
pub static PIPE_EXEC_LAYER_EVENT_BUS: OnceLock<PipeExecLayerEventBus<EthPrimitives>> =
15+
OnceLock::new();
1316

1417
/// Get a reference to the global `PipeExecLayerEventBus` instance.
15-
pub fn get_pipe_exec_layer_event_bus<N: NodePrimitives>() -> &'static PipeExecLayerEventBus<N> {
16-
let mut wait_time = 0;
18+
/// Blocks until the event bus is initialized, with a maximum timeout of 120 seconds.
19+
pub fn get_pipe_exec_layer_event_bus() -> &'static PipeExecLayerEventBus<EthPrimitives> {
20+
const MAX_WAIT_SECS: u64 = 120;
21+
let start = std::time::Instant::now();
1722
loop {
18-
let event_bus = PIPE_EXEC_LAYER_EVENT_BUS
19-
.get()
20-
.map(|ext| ext.downcast_ref::<PipeExecLayerEventBus<N>>().unwrap());
21-
if let Some(event_bus) = event_bus {
22-
break event_bus;
23-
} else if wait_time % 5 == 0 {
23+
if let Some(event_bus) = PIPE_EXEC_LAYER_EVENT_BUS.get() {
24+
return event_bus;
25+
}
26+
assert!(
27+
start.elapsed().as_secs() < MAX_WAIT_SECS,
28+
"PipeExecLayerEventBus not initialized after {}s — \
29+
likely a startup ordering bug",
30+
MAX_WAIT_SECS
31+
);
32+
if start.elapsed().as_secs().is_multiple_of(5) {
2433
info!("Wait PipeExecLayerEventBus ready...");
2534
}
2635
sleep(Duration::from_secs(1));
27-
wait_time += 1;
2836
}
2937
}
3038

crates/pipe-exec-layer-ext-v2/execute/src/lib.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,21 @@ struct Core<Storage: GravityStorage> {
235235
make_canonical_barrier: Channel<u64 /* block number */, Instant>,
236236
discard_txs_tx: UnboundedSender<Vec<TxHash>>,
237237
cache: PersistBlockCache,
238+
// Ordering rationale: `Ordering::Release` (stores) / `Ordering::Acquire` (loads) is
239+
// sufficient — `SeqCst` is unnecessary. The `execute_block_barrier` Channel (backed by
240+
// `Mutex<Inner>`) enforces strict serial block execution: block N's `process()` cannot
241+
// proceed past `wait((epoch, N-1))` until block N-1 has completed and called
242+
// `notify((epoch, N-1), ...)`. Both stores sit inside this serialized critical section,
243+
// so there is only ever a single writer — no concurrent writer can interleave.
244+
//
245+
// The only concurrent readers are in the timeout branch below (`self.epoch()` /
246+
// `self.execute_height()`), which merely check for stale/duplicate blocks. Each
247+
// individual Acquire load sees the latest Release store (no staleness); the two
248+
// reads are simply not atomic *with respect to each other*, so at worst a concurrent
249+
// update between the two loads causes one extra wait-loop iteration, never an
250+
// incorrect discard. Note that even `SeqCst` would not help here — two separate
251+
// loads are never atomic as a pair regardless of ordering; only an `AtomicU128`
252+
// or a lock could provide an atomic snapshot, but neither is needed.
238253
epoch: AtomicU64,
239254
execute_height: AtomicU64,
240255
metrics: PipeExecLayerMetrics,
@@ -374,7 +389,11 @@ impl<Storage: GravityStorage> Core<Storage> {
374389
.await
375390
{
376391
Some(parent) => break parent,
377-
// Make sure the ordered blocks are idempotent
392+
// Make sure the ordered blocks are idempotent.
393+
// NOTE: Each Acquire load below sees the latest Release
394+
// store to its respective atomic. The two reads are not mutually
395+
// atomic, but a concurrent update between them at worst causes one
396+
// extra wait-loop iteration, never an incorrect discard.
378397
None => {
379398
if block_epoch < self.epoch() || block_number <= self.execute_height() {
380399
warn!(target: "PipeExecService.process",
@@ -456,8 +475,12 @@ impl<Storage: GravityStorage> Core<Storage> {
456475
new_epoch=?epoch,
457476
"new epoch"
458477
);
478+
// SAFETY: Release ordering is sufficient here — see comment on field
479+
// declarations.
459480
assert_eq!(self.epoch.fetch_max(epoch, Ordering::Release), block_epoch);
460481
}
482+
// SAFETY: Release ordering is sufficient — the execute_block_barrier
483+
// serializes writers; only the timeout branch reads these concurrently (harmlessly).
461484
assert_eq!(self.execute_height.fetch_add(1, Ordering::Release), block_number - 1);
462485
self.execute_block_barrier
463486
.notify(
@@ -1444,11 +1467,9 @@ where
14441467
};
14451468
tokio::spawn(service.run());
14461469

1447-
PIPE_EXEC_LAYER_EVENT_BUS.get_or_init(|| {
1448-
Box::new(PipeExecLayerEventBus {
1449-
event_rx: std::sync::Mutex::new(Some(event_rx)),
1450-
discard_txs: tokio::sync::Mutex::new(Some(discard_txs_rx)),
1451-
})
1470+
PIPE_EXEC_LAYER_EVENT_BUS.get_or_init(|| PipeExecLayerEventBus {
1471+
event_rx: std::sync::Mutex::new(Some(event_rx)),
1472+
discard_txs: tokio::sync::Mutex::new(Some(discard_txs_rx)),
14521473
});
14531474

14541475
PipeExecLayerApi {

crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/metadata_txn.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
//! Metadata transaction execution
22
33
use super::{
4+
new_system_call_txn,
45
types::{convert_active_validators_to_bcs, onBlockStartCall, NewEpochEvent},
56
SYSTEM_CALLER,
67
};
78
use crate::{onchain_config::BLOCK_ADDR, ExecuteOrderedBlockResult, OrderedBlock};
8-
use alloy_consensus::{constants::EMPTY_WITHDRAWALS, Header, TxLegacy, EMPTY_OMMER_ROOT_HASH};
9+
use alloy_consensus::{constants::EMPTY_WITHDRAWALS, Header, EMPTY_OMMER_ROOT_HASH};
910
use alloy_eips::{eip4895::Withdrawals, merge::BEACON_NONCE};
10-
use alloy_primitives::{Address, Bytes, Signature, TxKind, U256};
11+
use alloy_primitives::{Bytes, U256};
1112
use alloy_sol_types::{SolCall, SolEvent};
1213
use gravity_api_types::events::contract_event::GravityEvent;
1314
use gravity_primitives::get_gravity_config;
1415
use reth_chainspec::{ChainSpec, EthereumHardforks};
15-
use reth_ethereum_primitives::{Block, BlockBody, Transaction, TransactionSigned};
16-
use reth_evm::{Evm, IntoTxEnv};
16+
use reth_ethereum_primitives::{Block, BlockBody, TransactionSigned};
17+
use reth_evm::Evm;
1718
use reth_execution_types::BlockExecutionOutput;
18-
use reth_primitives::{Receipt, Recovered};
19+
use reth_primitives::Receipt;
1920
use reth_provider::BlockExecutionResult;
2021
use revm::{
2122
context::TxEnv,
@@ -208,27 +209,6 @@ pub fn transact_system_txn(
208209
(SystemTxnResult { result: result.result, txn }, result.state)
209210
}
210211

211-
/// Create a new system call transaction
212-
fn new_system_call_txn(
213-
contract: Address,
214-
nonce: u64,
215-
gas_price: u128,
216-
input: Bytes,
217-
) -> TransactionSigned {
218-
TransactionSigned::new_unhashed(
219-
Transaction::Legacy(TxLegacy {
220-
chain_id: None,
221-
nonce,
222-
gas_price,
223-
gas_limit: 30_000_000,
224-
to: TxKind::Call(contract),
225-
value: U256::ZERO,
226-
input,
227-
}),
228-
Signature::new(U256::ZERO, U256::ZERO, false),
229-
)
230-
}
231-
232212
/// Execute a metadata contract call (onBlockStart from Blocker.sol)
233213
///
234214
/// Calls Blocker.onBlockStart(proposerIndex, failedProposerIndices, timestampMicros)

crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use alloy_consensus::{EthereumTxEnvelope, TxEip4844, TxLegacy};
102102
use alloy_primitives::{Bytes, Signature, U256};
103103
use reth_ethereum_primitives::{Transaction, TransactionSigned};
104104
use revm_primitives::TxKind;
105-
use tracing::{debug, info, warn};
105+
use tracing::{debug, info};
106106

107107
/// Construct validator transactions envelope (JWK updates and DKG transcripts)
108108
///

crates/pipe-exec-layer-ext-v2/relayer/src/blockchain_source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
data_source::{source_types, OracleData, OracleDataSource},
77
eth_client::EthHttpCli,
88
};
9-
use alloy_primitives::{hex, Address, Bytes, U256};
9+
use alloy_primitives::{Address, Bytes, U256};
1010
use alloy_rpc_types::Filter;
1111
use alloy_sol_macro::sol;
1212
use alloy_sol_types::SolEvent;
@@ -340,6 +340,7 @@ impl OracleDataSource for BlockchainEventSource {
340340
mod tests {
341341
use super::*;
342342
use crate::data_source::OracleDataSource;
343+
use alloy_primitives::hex;
343344

344345
// =========================================================================
345346
// Fixed Anvil Deployment Addresses

crates/transaction-pool/src/maintain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
183183
let pool = pool.clone();
184184
tokio::spawn(async move {
185185
let mut discard_txs_rx =
186-
get_pipe_exec_layer_event_bus::<N>().discard_txs.lock().await.take().unwrap();
186+
get_pipe_exec_layer_event_bus().discard_txs.lock().await.take().unwrap();
187187
while let Some(discard_txs) = discard_txs_rx.recv().await {
188188
debug!(target: "txpool", count=%discard_txs.len(), "discarding transactions");
189189
pool.remove_transactions(discard_txs);

0 commit comments

Comments
 (0)