Skip to content

Commit 6a9d979

Browse files
committed
refactor: make it depend on signet-journal
1 parent 3ab474c commit 6a9d979

File tree

6 files changed

+28
-28
lines changed

6 files changed

+28
-28
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ signet-bundle = "0.10.0"
4545
signet-constants = "0.10.0"
4646
signet-evm = "0.10.0"
4747
signet-extract = "0.10.0"
48+
signet-journal = "0.10.0"
4849
signet-tx-cache = "0.10.0"
4950
signet-types = "0.10.0"
5051
signet-zenith = "0.10.0"

crates/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@ repository.workspace = true
1212
signet-node-types.workspace = true
1313

1414
signet-evm.workspace = true
15+
signet-journal.workspace = true
1516
signet-types.workspace = true
1617
signet-zenith.workspace = true
1718

18-
trevm.workspace = true
19-
2019
alloy.workspace = true
2120

2221
reth.workspace = true

crates/db/src/convert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use alloy::consensus::TxReceipt;
1111

1212
/// Trait for types that can be converted into other types as they're already compatible.
13-
/// Uswed for converting between alloy/reth/signet types.
13+
/// Used for converting between alloy/reth/signet types.
1414
pub trait DataCompat<Other: DataCompat<Self>>: Sized {
1515
/// Convert `self` into the target type.
1616
fn convert(self) -> Other;

crates/db/src/journal/ingestor.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use crate::{SignetDbRw, journal::JournalDb};
2-
use futures_util::{Stream, StreamExt};
2+
use futures_util::StreamExt;
33
use reth::providers::ProviderResult;
4+
use signet_journal::{Journal, JournalStream};
45
use signet_node_types::NodeTypesDbTrait;
5-
use signet_types::primitives::SealedHeader;
66
use std::sync::Arc;
77
use tokio::task::JoinHandle;
8-
use trevm::journal::BlockUpdate;
98

109
/// A task that ingests journals into a reth database.
1110
#[derive(Debug)]
@@ -33,20 +32,19 @@ impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
3332

3433
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
3534
where
36-
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
35+
S: JournalStream<'static> + Send + Unpin + 'static,
3736
{
38-
while let Some(item) = stream.next().await {
37+
while let Some(Journal::V1(journal)) = stream.next().await {
3938
// FUTURE: Sanity check that the header height matches the update
4039
// height. Sanity check that both heights are 1 greater than the
4140
// last height in the database.
4241

4342
let db = self.db.clone();
44-
let (header, block_update) = item;
4543

4644
// DB interaction is sync, so we spawn a blocking task for it. We
4745
// immediately await that task. This prevents blocking the worker
4846
// thread
49-
tokio::task::spawn_blocking(move || db.ingest(header, block_update))
47+
tokio::task::spawn_blocking(move || db.ingest(journal))
5048
.await
5149
.expect("ingestion should not panic")?;
5250
}
@@ -57,7 +55,7 @@ impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
5755
/// Spawn a task to ingest journals from the provided stream.
5856
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
5957
where
60-
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
58+
S: JournalStream<'static> + Send + Unpin + 'static,
6159
{
6260
tokio::spawn(self.task_future(stream))
6361
}
@@ -67,7 +65,7 @@ impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
6765
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
6866
where
6967
Db: NodeTypesDbTrait,
70-
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
68+
S: JournalStream<'static> + Send + Unpin + 'static,
7169
{
7270
let ingestor = JournalIngestor::new(db);
7371
ingestor.task_future(stream).await

crates/db/src/journal/provider.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
use crate::{DataCompat, journal::JournalDb};
2-
use futures_util::{Stream, StreamExt};
1+
use crate::journal::JournalDb;
2+
use futures_util::StreamExt;
33
use reth::{
4+
primitives::SealedHeader,
45
providers::{
56
CanonChainTracker, DatabaseProviderFactory, DatabaseProviderRW, ProviderResult,
67
providers::BlockchainProvider,
78
},
89
rpc::types::engine::ForkchoiceState,
910
};
11+
use signet_journal::{Journal, JournalStream};
1012
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
11-
use signet_types::primitives::SealedHeader;
1213
use tokio::task::JoinHandle;
13-
use trevm::journal::BlockUpdate;
1414

1515
/// A task that processes journal updates for a specific database, and calls
1616
/// the appropriate methods on a [`BlockchainProvider`] to update the in-memory
@@ -40,21 +40,20 @@ impl<Db: NodeTypesDbTrait> JournalProviderTask<Db> {
4040
/// task-spawning system.
4141
pub async fn task_future<S>(self, mut journals: S) -> ProviderResult<()>
4242
where
43-
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
43+
S: JournalStream<'static> + Send + Unpin + 'static,
4444
{
4545
loop {
46-
let Some((header, block_update)) = journals.next().await else { break };
47-
48-
let block_hash = header.hash();
46+
let Some(Journal::V1(journal)) = journals.next().await else { break };
4947

5048
let rw = self.provider.database_provider_rw().map(DatabaseProviderRW);
5149

52-
let r_header = header.clone_convert();
50+
let r_header = SealedHeader::new_unhashed(journal.header().clone());
51+
let block_hash = r_header.hash();
5352

5453
// DB interaction is sync, so we spawn a blocking task for it. We
5554
// immediately await that task. This prevents blocking the worker
5655
// thread
57-
tokio::task::spawn_blocking(move || rw?.ingest(header, block_update))
56+
tokio::task::spawn_blocking(move || rw?.ingest(journal))
5857
.await
5958
.expect("ingestion should not panic")?;
6059

@@ -74,7 +73,7 @@ impl<Db: NodeTypesDbTrait> JournalProviderTask<Db> {
7473
/// Spawn the journal provider task.
7574
pub fn spawn<S>(self, journals: S) -> JoinHandle<ProviderResult<()>>
7675
where
77-
S: Stream<Item = (SealedHeader, BlockUpdate<'static>)> + Send + Unpin + 'static,
76+
S: JournalStream<'static> + Send + Unpin + 'static,
7877
{
7978
tokio::spawn(self.task_future(journals))
8079
}

crates/db/src/journal/trait.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::RuWriter;
22
use alloy::consensus::{BlockHeader, Header};
33
use reth::{providers::ProviderResult, revm::db::BundleState};
44
use signet_evm::{BlockResult, ExecutionOutcome};
5+
use signet_journal::HostJournal;
56
use signet_types::primitives::{RecoveredBlock, SealedBlock, SealedHeader, TransactionSigned};
6-
use trevm::journal::BlockUpdate;
77

88
/// A database that can be updated with journals.
99
pub trait JournalDb: RuWriter {
@@ -18,18 +18,21 @@ pub trait JournalDb: RuWriter {
1818
///
1919
/// This is intended to be used for tx simulation, and other purposes that
2020
/// need fast state access WITHTOUT needing to retrieve historical data.
21-
fn ingest(&self, header: SealedHeader, update: BlockUpdate<'_>) -> ProviderResult<()> {
22-
let journal_hash = update.journal_hash();
21+
fn ingest(&self, journal: HostJournal<'static>) -> ProviderResult<()> {
22+
let journal_hash = journal.journal_hash();
23+
24+
let (meta, bsi) = journal.into_parts();
25+
let (_, _, header) = meta.into_parts();
2326

2427
// TODO: remove the clone in future versions. This can be achieved by
2528
// _NOT_ making a `BlockResult` and instead manually updating relevan
2629
// tables. However, this means diverging more fro the underlying reth
2730
// logic that we are currently re-using.
28-
let bundle_state: BundleState = update.journal().clone().into();
31+
let bundle_state: BundleState = bsi.into();
2932
let execution_outcome = ExecutionOutcome::new(bundle_state, vec![], header.number());
3033

3134
let block: SealedBlock<TransactionSigned, Header> =
32-
SealedBlock { header, body: Default::default() };
35+
SealedBlock { header: SealedHeader::new(header), body: Default::default() };
3336
let block_result =
3437
BlockResult { sealed_block: RecoveredBlock::new(block, vec![]), execution_outcome };
3538

0 commit comments

Comments
 (0)