Skip to content

Commit 69d42d5

Browse files
committed
feat: ingestor task
1 parent df71842 commit 69d42d5

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

crates/db/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ reth-prune-types.workspace = true
2525

2626
itertools.workspace = true
2727
serde.workspace = true
28-
tracing.workspace = true
28+
tracing.workspace = true
29+
futures-util = "0.3.31"
30+
tokio.workspace = true
31+
auto_impl = "1.3.0"

crates/db/src/journal/mod.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,83 @@
22
33
mod r#trait;
44
pub use r#trait::JournalDb;
5+
6+
use futures_util::{Stream, StreamExt};
7+
use reth::providers::{DatabaseProviderRW, ProviderResult};
8+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
9+
use std::sync::Arc;
10+
use tokio::task::JoinHandle;
11+
use trevm::journal::BlockUpdate;
12+
13+
/// A task that ingests journals into a reth database.
14+
#[derive(Debug)]
15+
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
16+
db: Arc<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>,
17+
}
18+
19+
impl<Db: NodeTypesDbTrait> From<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>
20+
for JournalIngestor<Db>
21+
{
22+
fn from(value: DatabaseProviderRW<Db, SignetNodeTypes<Db>>) -> Self {
23+
Self::new(value.into())
24+
}
25+
}
26+
27+
impl<Db: NodeTypesDbTrait> From<Arc<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>>
28+
for JournalIngestor<Db>
29+
{
30+
fn from(value: Arc<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>) -> Self {
31+
Self::new(value)
32+
}
33+
}
34+
35+
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
36+
/// Create a new `JournalIngestor` with the given database provider.
37+
pub const fn new(db: Arc<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>) -> Self {
38+
Self { db }
39+
}
40+
41+
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
42+
where
43+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
44+
{
45+
while let Some(item) = stream.next().await {
46+
// FUTURE: Sanity check that the header height matches the update
47+
// height. Sanity check that both heights are 1 greater than the
48+
// last height in the database.
49+
50+
let db = self.db.clone();
51+
let (header, block_update) = item;
52+
53+
// DB interaction is sync, so we spawn a blocking task for it. We
54+
// immediately await that task. This prevents blocking the worker
55+
// thread
56+
tokio::task::spawn_blocking(move || db.ingest(&header, block_update))
57+
.await
58+
.expect("ingestion should not panic")?;
59+
}
60+
// Stream has ended, return Ok
61+
Ok(())
62+
}
63+
64+
/// Spawn a task to ingest journals from the provided stream.
65+
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
66+
where
67+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
68+
{
69+
tokio::spawn(self.task_future(stream))
70+
}
71+
}
72+
73+
/// Ingest journals from a stream into a reth database.
74+
pub async fn ingest_journals<Db, S>(
75+
db: Arc<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>,
76+
stream: S,
77+
) -> ProviderResult<()>
78+
where
79+
Db: NodeTypesDbTrait,
80+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
81+
{
82+
let ingestor = JournalIngestor::new(db);
83+
ingestor.task_future(stream).await
84+
}

crates/db/src/traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::{collections::BTreeMap, ops::RangeInclusive};
1414
use tracing::trace;
1515

1616
/// Writer for [`Passage::Enter`] events.
17+
#[auto_impl::auto_impl(&, Arc, Box)]
1718
pub trait RuWriter {
1819
/// Get the last block number
1920
fn last_block_number(&self) -> ProviderResult<BlockNumber>;

0 commit comments

Comments
 (0)