Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions crates/ingress-rpc/src/validation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered};
use alloy_primitives::{Address, B256, U256};
use alloy_provider::{Provider, RootProvider};
Expand Down Expand Up @@ -49,24 +50,29 @@ pub trait L1BlockInfoLookup: Send + Sync {
#[async_trait]
impl L1BlockInfoLookup for RootProvider<Optimism> {
async fn fetch_l1_block_info(&self) -> RpcResult<L1BlockInfo> {
let block_number = self
.get_block_number()
.await
.map_err(|_| EthApiError::InternalEthError.into_rpc_err())?;
let block = self
.get_block_by_number(block_number.into())
.get_block(BlockId::Number(BlockNumberOrTag::Latest))
.full()
.await
.map_err(|_| EthApiError::InternalEthError.into_rpc_err())?
.ok_or_else(|| EthApiError::HeaderNotFound(block_number.into()).into_rpc_err())?;
.map_err(|e| {
warn!(message = "failed to fetch latest block", err = %e);
EthApiError::InternalEthError.into_rpc_err()
})?
.ok_or_else(|| {
warn!(message = "empty latest block returned");
EthApiError::InternalEthError.into_rpc_err()
})?;

let txs = block.transactions.clone();
let first_tx = txs
.first_transaction()
.ok_or_else(|| EthApiError::InternalEthError.into_rpc_err())?;

Ok(extract_l1_info_from_tx(&first_tx.clone())
.map_err(|_| EthApiError::InternalEthError.into_rpc_err())?)
let first_tx = txs.first_transaction().ok_or_else(|| {
warn!(message = "block contains no transactions");
EthApiError::InternalEthError.into_rpc_err()
})?;

Ok(extract_l1_info_from_tx(&first_tx.clone()).map_err(|e| {
warn!(message = "failed to extract l1_info from tx", err = %e);
EthApiError::InternalEthError.into_rpc_err()
})?)
}
}

Expand Down
35 changes: 26 additions & 9 deletions crates/maintenance/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::collections::HashSet;
use std::time::Duration;
use tips_audit::{BundleEvent, BundleEventPublisher, DropReason};
use tips_datastore::BundleDatastore;
use tips_datastore::postgres::{BundleFilter, BundleState, BundleWithMetadata};
use tips_datastore::postgres::{BlockInfoUpdate, BundleFilter, BundleState, BundleWithMetadata};
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

pub struct MaintenanceJob<S: BundleDatastore, P: Provider<Optimism>, K: BundleEventPublisher> {
Expand Down Expand Up @@ -54,6 +54,11 @@ impl<S: BundleDatastore, P: Provider<Optimism>, K: BundleEventPublisher> Mainten
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to get latest block"))?;

debug!(
message = "Executing up to latest block",
block_number = latest_block.number()
);

let block_info = self.store.get_current_block_info().await?;

if let Some(current_block_info) = block_info {
Expand All @@ -62,6 +67,8 @@ impl<S: BundleDatastore, P: Provider<Optimism>, K: BundleEventPublisher> Mainten
for block_num in
(current_block_info.latest_block_number + 1)..=latest_block.header.number
{
debug!(message = "Fetching block number", ?latest_block);

let block = self
.node
.get_block(BlockId::Number(alloy_rpc_types::BlockNumberOrTag::Number(
Expand All @@ -71,12 +78,19 @@ impl<S: BundleDatastore, P: Provider<Optimism>, K: BundleEventPublisher> Mainten
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to get block {}", block_num))?;

let hash = block.hash();
self.on_new_block(block).await?;
self.store
.commit_block_info(vec![BlockInfoUpdate {
block_number: block_num,
block_hash: hash,
}])
.await?;
}
}
} else {
warn!("No block info found in database, initializing with latest block as finalized");
let block_update = tips_datastore::postgres::BlockInfoUpdate {
let block_update = BlockInfoUpdate {
block_number: latest_block.header.number,
block_hash: latest_block.header.hash,
};
Expand Down Expand Up @@ -142,33 +156,36 @@ impl<S: BundleDatastore, P: Provider<Optimism>, K: BundleEventPublisher> Mainten
loop {
tokio::select! {
_ = maintenance_interval.tick() => {
info!(message = "starting maintenance");
match self.periodic_maintenance().await {
Ok(_) => {
info!("Periodic maintenance completed");
info!(message = "Periodic maintenance completed");
},
Err(err) => {
error!("Error in periodic maintenance: {:?}", err);
error!(message = "Error in periodic maintenance", error = %err);
}

}
}
_ = execution_interval.tick() => {
info!(message = "starting execution run");
match self.execute().await {
Ok(_) => {
info!("Successfully executed maintenance run");
info!(message = "Successfully executed maintenance run");
}
Err(e) => {
error!("Error executing maintenance run: {:?}", e);
error!(message = "Error executing maintenance run", error = %e);
}
}
}
Some(flashblock) = fb_rx.recv() => {
info!(message = "starting flashblock processing");
match self.process_flashblock(flashblock).await {
Ok(_) => {
info!("Successfully processed flashblock");
info!(message = "Successfully processed flashblock");
}
Err(e) => {
error!("Error processing flashblock: {:?}", e);
error!(message = "Error processing flashblock", error = %e);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions ui/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import * as schema from "./schema";

const pool = new Pool({
connectionString: process.env.TIPS_DATABASE_URL,
ssl: {
requestCert: false,
rejectUnauthorized: false,
},
});

export const db = drizzle(pool, { schema });