Skip to content

Commit 3ab474c

Browse files
committed
feat: a task that updates the final/safe/canonical when ingesting
1 parent bdcc098 commit 3ab474c

File tree

4 files changed

+164
-75
lines changed

4 files changed

+164
-75
lines changed

crates/db/src/journal/ingestor.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use crate::{SignetDbRw, journal::JournalDb};
2+
use futures_util::{Stream, StreamExt};
3+
use reth::providers::ProviderResult;
4+
use signet_node_types::NodeTypesDbTrait;
5+
use signet_types::primitives::SealedHeader;
6+
use std::sync::Arc;
7+
use tokio::task::JoinHandle;
8+
use trevm::journal::BlockUpdate;
9+
10+
/// A task that ingests journals into a reth database.
11+
#[derive(Debug)]
12+
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
13+
db: Arc<SignetDbRw<Db>>,
14+
}
15+
16+
impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
17+
fn from(value: SignetDbRw<Db>) -> Self {
18+
Self::new(value.into())
19+
}
20+
}
21+
22+
impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
23+
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
24+
Self::new(value)
25+
}
26+
}
27+
28+
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
29+
/// Create a new `JournalIngestor` with the given database provider.
30+
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
31+
Self { db }
32+
}
33+
34+
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
35+
where
36+
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
37+
{
38+
while let Some(item) = stream.next().await {
39+
// FUTURE: Sanity check that the header height matches the update
40+
// height. Sanity check that both heights are 1 greater than the
41+
// last height in the database.
42+
43+
let db = self.db.clone();
44+
let (header, block_update) = item;
45+
46+
// DB interaction is sync, so we spawn a blocking task for it. We
47+
// immediately await that task. This prevents blocking the worker
48+
// thread
49+
tokio::task::spawn_blocking(move || db.ingest(header, block_update))
50+
.await
51+
.expect("ingestion should not panic")?;
52+
}
53+
// Stream has ended, return Ok
54+
Ok(())
55+
}
56+
57+
/// Spawn a task to ingest journals from the provided stream.
58+
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
59+
where
60+
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
61+
{
62+
tokio::spawn(self.task_future(stream))
63+
}
64+
}
65+
66+
/// Ingest journals from a stream into a reth database.
67+
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
68+
where
69+
Db: NodeTypesDbTrait,
70+
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
71+
{
72+
let ingestor = JournalIngestor::new(db);
73+
ingestor.task_future(stream).await
74+
}

crates/db/src/journal/mod.rs

Lines changed: 4 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,76 +3,8 @@
33
mod r#trait;
44
pub use r#trait::JournalDb;
55

6-
use crate::SignetDbRw;
7-
use futures_util::{Stream, StreamExt};
8-
use reth::providers::ProviderResult;
9-
use signet_node_types::NodeTypesDbTrait;
10-
use std::sync::Arc;
11-
use tokio::task::JoinHandle;
12-
use trevm::journal::BlockUpdate;
6+
mod provider;
7+
pub use provider::JournalProviderTask;
138

14-
/// A task that ingests journals into a reth database.
15-
#[derive(Debug)]
16-
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
17-
db: Arc<SignetDbRw<Db>>,
18-
}
19-
20-
impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
21-
fn from(value: SignetDbRw<Db>) -> Self {
22-
Self::new(value.into())
23-
}
24-
}
25-
26-
impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
27-
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
28-
Self::new(value)
29-
}
30-
}
31-
32-
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
33-
/// Create a new `JournalIngestor` with the given database provider.
34-
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
35-
Self { db }
36-
}
37-
38-
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
39-
where
40-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
41-
{
42-
while let Some(item) = stream.next().await {
43-
// FUTURE: Sanity check that the header height matches the update
44-
// height. Sanity check that both heights are 1 greater than the
45-
// last height in the database.
46-
47-
let db = self.db.clone();
48-
let (header, block_update) = item;
49-
50-
// DB interaction is sync, so we spawn a blocking task for it. We
51-
// immediately await that task. This prevents blocking the worker
52-
// thread
53-
tokio::task::spawn_blocking(move || db.ingest(&header, block_update))
54-
.await
55-
.expect("ingestion should not panic")?;
56-
}
57-
// Stream has ended, return Ok
58-
Ok(())
59-
}
60-
61-
/// Spawn a task to ingest journals from the provided stream.
62-
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
63-
where
64-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
65-
{
66-
tokio::spawn(self.task_future(stream))
67-
}
68-
}
69-
70-
/// Ingest journals from a stream into a reth database.
71-
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
72-
where
73-
Db: NodeTypesDbTrait,
74-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
75-
{
76-
let ingestor = JournalIngestor::new(db);
77-
ingestor.task_future(stream).await
78-
}
9+
mod ingestor;
10+
pub use ingestor::{JournalIngestor, ingest_journals};

crates/db/src/journal/provider.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use crate::{DataCompat, journal::JournalDb};
2+
use futures_util::{Stream, StreamExt};
3+
use reth::{
4+
providers::{
5+
CanonChainTracker, DatabaseProviderFactory, DatabaseProviderRW, ProviderResult,
6+
providers::BlockchainProvider,
7+
},
8+
rpc::types::engine::ForkchoiceState,
9+
};
10+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
11+
use signet_types::primitives::SealedHeader;
12+
use tokio::task::JoinHandle;
13+
use trevm::journal::BlockUpdate;
14+
15+
/// A task that processes journal updates for a specific database, and calls
16+
/// the appropriate methods on a [`BlockchainProvider`] to update the in-memory
17+
/// chain view.
18+
#[derive(Debug, Clone)]
19+
pub struct JournalProviderTask<Db: NodeTypesDbTrait> {
20+
provider: BlockchainProvider<SignetNodeTypes<Db>>,
21+
}
22+
23+
impl<Db: NodeTypesDbTrait> JournalProviderTask<Db> {
24+
/// Instantiate a new task.
25+
pub const fn new(provider: BlockchainProvider<SignetNodeTypes<Db>>) -> Self {
26+
Self { provider }
27+
}
28+
29+
/// Get a reference to the provider.
30+
pub const fn provider(&self) -> &BlockchainProvider<SignetNodeTypes<Db>> {
31+
&self.provider
32+
}
33+
34+
/// Deconstruct the task into its provider.
35+
pub fn into_inner(self) -> BlockchainProvider<SignetNodeTypes<Db>> {
36+
self.provider
37+
}
38+
39+
/// Create a future for the task, suitable for [`tokio::spawn`] or another
40+
/// task-spawning system.
41+
pub async fn task_future<S>(self, mut journals: S) -> ProviderResult<()>
42+
where
43+
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
44+
{
45+
loop {
46+
let Some((header, block_update)) = journals.next().await else { break };
47+
48+
let block_hash = header.hash();
49+
50+
let rw = self.provider.database_provider_rw().map(DatabaseProviderRW);
51+
52+
let r_header = header.clone_convert();
53+
54+
// DB interaction is sync, so we spawn a blocking task for it. We
55+
// immediately await that task. This prevents blocking the worker
56+
// thread
57+
tokio::task::spawn_blocking(move || rw?.ingest(header, block_update))
58+
.await
59+
.expect("ingestion should not panic")?;
60+
61+
self.provider.set_canonical_head(r_header.clone());
62+
self.provider.set_safe(r_header.clone());
63+
self.provider.set_finalized(r_header);
64+
self.provider.on_forkchoice_update_received(&ForkchoiceState {
65+
head_block_hash: block_hash,
66+
safe_block_hash: block_hash,
67+
finalized_block_hash: block_hash,
68+
});
69+
}
70+
71+
Ok(())
72+
}
73+
74+
/// Spawn the journal provider task.
75+
pub fn spawn<S>(self, journals: S) -> JoinHandle<ProviderResult<()>>
76+
where
77+
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
78+
{
79+
tokio::spawn(self.task_future(journals))
80+
}
81+
}

crates/db/src/journal/trait.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub trait JournalDb: RuWriter {
1818
///
1919
/// This is intended to be used for tx simulation, and other purposes that
2020
/// need fast state access WITHTOUT needing to retrieve historical data.
21-
fn ingest(&self, header: &Header, update: BlockUpdate<'_>) -> ProviderResult<()> {
21+
fn ingest(&self, header: SealedHeader, update: BlockUpdate<'_>) -> ProviderResult<()> {
2222
let journal_hash = update.journal_hash();
2323

2424
// TODO: remove the clone in future versions. This can be achieved by
@@ -29,7 +29,7 @@ pub trait JournalDb: RuWriter {
2929
let execution_outcome = ExecutionOutcome::new(bundle_state, vec![], header.number());
3030

3131
let block: SealedBlock<TransactionSigned, Header> =
32-
SealedBlock { header: SealedHeader::new(header.to_owned()), body: Default::default() };
32+
SealedBlock { header, body: Default::default() };
3333
let block_result =
3434
BlockResult { sealed_block: RecoveredBlock::new(block, vec![]), execution_outcome };
3535

@@ -40,7 +40,9 @@ pub trait JournalDb: RuWriter {
4040
std::iter::empty(),
4141
&block_result,
4242
journal_hash,
43-
)
43+
)?;
44+
45+
Ok(())
4446
}
4547
}
4648

0 commit comments

Comments
 (0)