Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b509314
feat: implement gap recovery mechanism for L1 watcher and use in Chai…
jonastheis Oct 30, 2025
0ea4ef7
make sure that there's no deadlock with command receiver as L1Watcher…
jonastheis Oct 30, 2025
5670af8
feat: add skipping logic for duplicate L1 messages and batch commits …
jonastheis Oct 30, 2025
ba20206
remove todo
jonastheis Oct 31, 2025
476d906
use select in watcher main loop
jonastheis Oct 31, 2025
f6eaf09
add test to test reset functionality
jonastheis Oct 31, 2025
21588bc
add test for preventing deadlock if send channel is full
jonastheis Oct 31, 2025
c907bd4
fmt
jonastheis Oct 31, 2025
10bc36c
add initial test setup
jonastheis Nov 4, 2025
51100a5
add L1WatcherHandleTrait for easier testability
jonastheis Nov 4, 2025
46c09f9
fix deadlock in test
jonastheis Nov 4, 2025
04c7e18
l1 event handling
frisitano Nov 4, 2025
c1a0500
l1 event handling
frisitano Nov 4, 2025
b96bda5
add testing of gap recovery for batch
jonastheis Nov 4, 2025
ccad3cb
clean up
frisitano Nov 4, 2025
abcc90b
fix lint
jonastheis Nov 5, 2025
937b0e0
fix watcher tests
jonastheis Nov 5, 2025
f15ffb9
add possibility to filter by processed to get_batch_by_index
jonastheis Nov 5, 2025
02fb909
make test easier to debug by failing instead of hanging
jonastheis Nov 5, 2025
49d38e5
Revert "add possibility to filter by processed to get_batch_by_index"
jonastheis Nov 5, 2025
6a23c25
address review comments
jonastheis Nov 5, 2025
b0e1e94
add test cases
frisitano Nov 5, 2025
dce07df
embed L1Notification channel receiver inside of the L1WatcherHandle
jonastheis Nov 6, 2025
524304a
cleanup
frisitano Nov 6, 2025
0493ec9
Merge branch 'main' into feat/l1-reorg
frisitano Nov 6, 2025
47d35e7
Merge remote-tracking branch 'origin/feat/l1-reorg' into feat/self-he…
jonastheis Nov 12, 2025
f4a999e
fixes after merge
jonastheis Nov 12, 2025
c59007d
address feedback
frisitano Nov 12, 2025
53d0923
use alloc String
frisitano Nov 12, 2025
de57dc4
use alloc ToString
frisitano Nov 12, 2025
08ca239
Merge branch 'main' into feat/l1-reorg
frisitano Nov 12, 2025
41b3ead
add l1_watcher_command_rx to addons for testing like l1_watcher_tx
jonastheis Nov 12, 2025
90fc085
Merge remote-tracking branch 'origin/feat/l1-reorg' into feat/self-he…
jonastheis Nov 12, 2025
9865e89
move checks into respective functions
jonastheis Nov 12, 2025
2f2960c
implement test for gap detection for batch and L1 messages. fix issue…
jonastheis Nov 13, 2025
d35eba1
implement gap and skip detection for revert events
jonastheis Nov 13, 2025
8ed53a6
Merge remote-tracking branch 'origin' into feat/self-healing-l1-events
jonastheis Nov 20, 2025
875f745
fixes after merge
jonastheis Nov 21, 2025
54bb0c3
fix test shutdown_consolidates_most_recent_batch_on_startup
jonastheis Nov 21, 2025
c19beb8
refactor test_batch_commit_gap with test fixture
jonastheis Nov 21, 2025
32587a2
refactor test_l1_message_gap with test fixture
jonastheis Nov 21, 2025
7eaf491
refactor test_batch_revert_gap with test fixture
jonastheis Nov 21, 2025
283bcf9
fix derivation pipeline benchmarks due to derivation pipeline using b…
jonastheis Nov 25, 2025
ba5497a
Merge remote-tracking branch 'origin/main' into feat/self-healing-l1-…
jonastheis Nov 25, 2025
1a7949f
fixes after merge
jonastheis Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ alloy-transport.workspace = true
# rollup-node
scroll-db = { workspace = true, features = ["test-utils"] }
rollup-node-primitives = { workspace = true, features = ["arbitrary"] }
rollup-node-watcher = { workspace = true, features = ["test-utils"] }

# scroll
reth-scroll-chainspec.workspace = true
reth-scroll-forks.workspace = true
reth-scroll-node = { workspace = true, features = ["test-utils"] }

# reth
reth-eth-wire-types.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum ChainOrchestratorError {
/// missing.
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
L1MessageQueueGap(u64),
/// A duplicate L1 message was detected at index {0}.
#[error("Duplicate L1 message detected at index {0}")]
DuplicateL1Message(u64),
/// An inconsistency was detected when trying to consolidate the chain.
#[error("Chain inconsistency detected")]
ChainInconsistency,
Expand All @@ -60,6 +63,9 @@ pub enum ChainOrchestratorError {
/// A gap was detected in batch commit events: the previous batch before index {0} is missing.
#[error("Batch commit gap detected at index {0}, previous batch commit not found")]
BatchCommitGap(u64),
/// A duplicate batch commit was detected at index {0}.
#[error("Duplicate batch commit detected at {0}")]
DuplicateBatchCommit(BatchInfo),
/// An error occurred while making a network request.
#[error("Network request error: {0}")]
NetworkRequestError(#[from] reth_network_p2p::error::RequestError),
Expand Down Expand Up @@ -92,6 +98,9 @@ pub enum ChainOrchestratorError {
/// An error occurred while handling rollup node primitives.
#[error("An error occurred while handling rollup node primitives: {0}")]
RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError),
/// An error occurred during gap reset.
#[error("Gap reset error: {0}")]
GapResetError(String),
}

impl CanRetry for ChainOrchestratorError {
Expand Down
372 changes: 365 additions & 7 deletions crates/chain-orchestrator/src/lib.rs

Large diffs are not rendered by default.

35 changes: 26 additions & 9 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,12 @@ impl DatabaseReadOperations for Database {
async fn get_batch_by_index(
&self,
batch_index: u64,
processed: Option<bool>,
) -> Result<Option<BatchCommitData>, DatabaseError> {
metered!(
DatabaseOperation::GetBatchByIndex,
self,
tx(move |tx| async move { tx.get_batch_by_index(batch_index).await })
tx(move |tx| async move { tx.get_batch_by_index(batch_index, processed).await })
)
}

Expand Down Expand Up @@ -453,6 +454,22 @@ impl DatabaseReadOperations for Database {
)
}

async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
metered!(
DatabaseOperation::GetLastBatchCommitL1Block,
self,
tx(|tx| async move { tx.get_last_batch_commit_l1_block().await })
)
}

async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
metered!(
DatabaseOperation::GetLastL1MessageL1Block,
self,
tx(|tx| async move { tx.get_last_l1_message_l1_block().await })
)
}

async fn get_n_l1_messages(
&self,
start: Option<L1MessageKey>,
Expand Down Expand Up @@ -719,7 +736,7 @@ mod test {
// Round trip the BatchCommitData through the database.
db.insert_batch(batch_commit.clone()).await.unwrap();
let batch_commit_from_db =
db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap();
db.get_batch_by_index(batch_commit.index, None).await.unwrap().unwrap();

assert_eq!(batch_commit, batch_commit_from_db);
}
Expand Down Expand Up @@ -1233,7 +1250,7 @@ mod test {

// Insert L2 blocks with different batch indices
for i in 100..110 {
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
let batch_data = db.get_batch_by_index(i, None).await.unwrap().unwrap();
let batch_info: BatchInfo = batch_data.into();
let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };

Expand Down Expand Up @@ -1402,9 +1419,9 @@ mod test {
db.set_finalized_l1_block_number(21).await.unwrap();

// Verify the batches and blocks were inserted correctly
let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().unwrap();
let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().unwrap();
let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().unwrap();
let retrieved_batch_1 = db.get_batch_by_index(1, None).await.unwrap().unwrap();
let retrieved_batch_2 = db.get_batch_by_index(2, None).await.unwrap().unwrap();
let retrieved_batch_3 = db.get_batch_by_index(3, None).await.unwrap().unwrap();
let retried_block_1 = db.get_l2_block_info_by_number(1).await.unwrap().unwrap();
let retried_block_2 = db.get_l2_block_info_by_number(2).await.unwrap().unwrap();
let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap().unwrap();
Expand All @@ -1425,9 +1442,9 @@ mod test {
assert_eq!(result, (Some(block_2), Some(11)));

// Verify that batches 2 and 3 are deleted
let batch_1 = db.get_batch_by_index(1).await.unwrap();
let batch_2 = db.get_batch_by_index(2).await.unwrap();
let batch_3 = db.get_batch_by_index(3).await.unwrap();
let batch_1 = db.get_batch_by_index(1, None).await.unwrap();
let batch_2 = db.get_batch_by_index(2, None).await.unwrap();
let batch_3 = db.get_batch_by_index(3, None).await.unwrap();
assert!(batch_1.is_some());
assert!(batch_2.is_none());
assert!(batch_3.is_none());
Expand Down
4 changes: 4 additions & 0 deletions crates/database/db/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub(crate) enum DatabaseOperation {
GetFinalizedL1BlockNumber,
GetProcessedL1BlockNumber,
GetL2HeadBlockNumber,
GetLastBatchCommitL1Block,
GetLastL1MessageL1Block,
GetNL1Messages,
GetNL2BlockDataHint,
GetL2BlockAndBatchInfoByHash,
Expand Down Expand Up @@ -92,6 +94,8 @@ impl DatabaseOperation {
Self::GetFinalizedL1BlockNumber => "get_finalized_l1_block_number",
Self::GetProcessedL1BlockNumber => "get_processed_l1_block_number",
Self::GetL2HeadBlockNumber => "get_l2_head_block_number",
Self::GetLastBatchCommitL1Block => "get_last_batch_commit_l1_block",
Self::GetLastL1MessageL1Block => "get_last_l1_message_l1_block",
Self::GetNL1Messages => "get_n_l1_messages",
Self::GetNL2BlockDataHint => "get_n_l2_block_data_hint",
Self::GetL2BlockAndBatchInfoByHash => "get_l2_block_and_batch_info_by_hash",
Expand Down
55 changes: 47 additions & 8 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
.map(|(_, batch_info)| batch_info)
.filter(|b| b.index > 1)
{
let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist");
let batch =
self.get_batch_by_index(batch_info.index, None).await?.expect("batch must exist");
self.delete_batches_gt_block_number(batch.block_number.saturating_sub(1)).await?;
};

Expand All @@ -383,7 +384,8 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
else {
return Ok((None, None));
};
let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist");
let batch =
self.get_batch_by_index(batch_info.index, None).await?.expect("batch must exist");
Ok((Some(block_info), Some(batch.block_number.saturating_add(1))))
}

Expand Down Expand Up @@ -649,6 +651,7 @@ pub trait DatabaseReadOperations {
async fn get_batch_by_index(
&self,
batch_index: u64,
processed: Option<bool>,
) -> Result<Option<BatchCommitData>, DatabaseError>;

/// Get the latest L1 block number from the database.
Expand All @@ -663,6 +666,12 @@ pub trait DatabaseReadOperations {
/// Get the latest L2 head block info.
async fn get_l2_head_block_number(&self) -> Result<u64, DatabaseError>;

/// Get the L1 block number of the last batch commit in the database.
async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError>;

/// Get the L1 block number of the last L1 message in the database.
async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError>;

/// Get a vector of n [`L1MessageEnvelope`]s in the database starting from the provided `start`
/// point.
async fn get_n_l1_messages(
Expand Down Expand Up @@ -721,13 +730,21 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
async fn get_batch_by_index(
&self,
batch_index: u64,
processed: Option<bool>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of adding the processed filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought I needed it at some point. reverted.

) -> Result<Option<BatchCommitData>, DatabaseError> {
Ok(models::batch_commit::Entity::find_by_id(
TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"),
)
.one(self.get_connection())
.await
.map(|x| x.map(Into::into))?)
let query = if let Some(p) = processed {
models::batch_commit::Entity::find().filter(
models::batch_commit::Column::Index
.eq(TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"))
.and(models::batch_commit::Column::Processed.eq(p)),
)
} else {
models::batch_commit::Entity::find_by_id(
TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"),
)
};

Ok(query.one(self.get_connection()).await.map(|x| x.map(Into::into))?)
}

async fn get_latest_l1_block_number(&self) -> Result<u64, DatabaseError> {
Expand Down Expand Up @@ -782,6 +799,28 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
.expect("l2_head_block should always be a valid u64"))
}

async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
Ok(models::batch_commit::Entity::find()
.order_by_desc(models::batch_commit::Column::BlockNumber)
.select_only()
.column(models::batch_commit::Column::BlockNumber)
.into_tuple::<i64>()
.one(self.get_connection())
.await?
.map(|block_number| block_number as u64))
}

async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
Ok(models::l1_message::Entity::find()
.order_by_desc(models::l1_message::Column::L1BlockNumber)
.select_only()
.column(models::l1_message::Column::L1BlockNumber)
.into_tuple::<i64>()
.one(self.get_connection())
.await?
.map(|block_number| block_number as u64))
}

async fn get_n_l1_messages(
&self,
start: Option<L1MessageKey>,
Expand Down
2 changes: 1 addition & 1 deletion crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where

// get the batch commit data.
let batch = db
.get_batch_by_index(batch_info.index)
.get_batch_by_index(batch_info.index, None)
.await
.map_err(|err| (batch_info.clone(), err.into()))?
.ok_or((
Expand Down
58 changes: 29 additions & 29 deletions crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,35 +342,34 @@ impl ScrollRollupNodeConfig {
};
let consensus = self.consensus_args.consensus(authorized_signer)?;

let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
if let Some(provider) = l1_provider.filter(|_| !self.test) {
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
(
None,
Some(
L1Watcher::spawn(
provider,
l1_start_block_number,
node_config,
self.l1_provider_args.logs_query_block_range,
)
.await,
),
)
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
// testing
#[cfg(feature = "test-utils")]
{
let (tx, rx) = tokio::sync::mpsc::channel(1000);
(Some(tx), Some(rx))
}

#[cfg(not(feature = "test-utils"))]
{
(None, None)
}
};
let (l1_notification_tx, l1_notification_rx, l1_watcher_handle): (
Option<Sender<Arc<L1Notification>>>,
_,
Option<rollup_node_watcher::L1WatcherHandle>,
) = if let Some(provider) = l1_provider.filter(|_| !self.test) {
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
let (rx, handle) = L1Watcher::spawn(
provider,
l1_start_block_number,
node_config,
self.l1_provider_args.logs_query_block_range,
)
.await;
(None, Some(rx), Some(handle))
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
// testing
#[cfg(feature = "test-utils")]
{
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a L1 watcher handle and receiver channel here that can be used for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 41b3ead

(Some(tx), Some(rx), None)
}

#[cfg(not(feature = "test-utils"))]
{
(None, None, None)
}
};

// Construct the l1 provider.
let l1_messages_provider = db.clone();
Expand Down Expand Up @@ -450,6 +449,7 @@ impl ScrollRollupNodeConfig {
Arc::new(block_client),
l2_provider,
l1_notification_rx.expect("L1 notification receiver should be set"),
l1_watcher_handle,
scroll_network_handle.into_scroll_network().await,
consensus,
engine,
Expand Down
26 changes: 17 additions & 9 deletions crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256};
use alloy_rpc_types_eth::Block;
use alloy_signer::Signer;
use alloy_signer_local::PrivateKeySigner;
use eyre::Ok;
use eyre::{bail, Ok};
use futures::{task::noop_waker_ref, FutureExt, StreamExt};
use reth_chainspec::EthChainSpec;
use reth_e2e_test_utils::{NodeHelperType, TmpDB};
Expand Down Expand Up @@ -48,7 +48,7 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::{sync::Mutex, time};
use tokio::{select, sync::Mutex, time};
use tracing::trace;

#[tokio::test]
Expand Down Expand Up @@ -1025,20 +1025,28 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<()
// Lets finalize the second batch.
l1_notification_tx.send(Arc::new(L1Notification::Finalized(batch_1_data.block_number))).await?;

let mut l2_block = None;
// Lets fetch the first consolidated block event - this should be the first block of the batch.
let l2_block = loop {
if let Some(ChainOrchestratorEvent::BlockConsolidated(consolidation_outcome)) =
rnm_events.next().await
{
break consolidation_outcome.block_info().clone();
select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {
bail!("Timed out waiting for first consolidated block after RNM restart");
}
};

evt = rnm_events.next() => {
if let Some(ChainOrchestratorEvent::BlockConsolidated(consolidation_outcome)) = evt {
l2_block = Some(consolidation_outcome.block_info().clone());
} else {
println!("Received unexpected event: {:?}", evt);
}
}
}

// One issue #273 is completed, we will again have safe blocks != finalized blocks, and this
// should be changed to 1. Assert that the consolidated block is the first block that was not
// previously processed of the batch.
assert_eq!(
l2_block.block_info.number, 41,
l2_block.unwrap().block_info.number,
41,
"Consolidated block number does not match expected number"
);

Expand Down
19 changes: 19 additions & 0 deletions crates/watcher/src/handle/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::L1Notification;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};

/// Commands that can be sent to the L1 Watcher.
#[derive(Debug)]
pub enum L1WatcherCommand {
/// Reset the watcher to a specific L1 block number.
///
/// This is used for gap recovery when the chain orchestrator detects missing L1 events.
ResetToBlock {
/// The L1 block number to reset to (last known good state)
block: u64,
/// New sender to replace the current notification channel
new_sender: mpsc::Sender<Arc<L1Notification>>,
/// Oneshot sender to signal completion of the reset operation
response_sender: oneshot::Sender<()>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really needed. removed.

},
}
Loading
Loading