|
| 1 | +use crate::{BlobFetcherError, Blobs, FetchResult}; |
| 2 | +use alloy::primitives::B256; |
| 3 | +use reth::network::cache::LruMap; |
| 4 | +use reth::transaction_pool::TransactionPool; |
| 5 | +use std::{ |
| 6 | + sync::{Arc, Mutex}, |
| 7 | + time::Duration, |
| 8 | +}; |
| 9 | +use tokio::sync::{mpsc, oneshot}; |
| 10 | +use tracing::{error, info, instrument, warn}; |
| 11 | + |
| 12 | +const BLOB_CACHE_SIZE: u32 = 144; |
| 13 | +const FETCH_RETRIES: usize = 3; |
| 14 | +const BETWEEN_RETRIES: Duration = Duration::from_millis(250); |
| 15 | + |
| 16 | +/// Instructions for the cache. |
| 17 | +/// |
| 18 | +/// These instructions are sent to the cache handle to perform operations like |
| 19 | +/// retrieving blobs. |
| 20 | +#[derive(Debug)] |
| 21 | +enum CacheInst { |
| 22 | + Retrieve { slot: u64, tx_hash: B256, version_hashes: Vec<B256>, resp: oneshot::Sender<Blobs> }, |
| 23 | +} |
| 24 | + |
| 25 | +/// Handle for the cache. |
| 26 | +#[derive(Debug, Clone)] |
| 27 | +pub struct CacheHandle { |
| 28 | + sender: mpsc::Sender<CacheInst>, |
| 29 | +} |
| 30 | + |
| 31 | +impl CacheHandle { |
| 32 | + /// Sends a cache instruction. |
| 33 | + async fn send(&self, inst: CacheInst) { |
| 34 | + let _ = self.sender.send(inst).await; |
| 35 | + } |
| 36 | + |
| 37 | + /// Fetches blobs from the cache. This triggers a background task to |
| 38 | + /// fetch blobs if they are not found in the cache. |
| 39 | + pub async fn fetch_blobs( |
| 40 | + &self, |
| 41 | + slot: u64, |
| 42 | + tx_hash: B256, |
| 43 | + version_hashes: Vec<B256>, |
| 44 | + ) -> FetchResult<Blobs> { |
| 45 | + let (resp, receiver) = oneshot::channel(); |
| 46 | + |
| 47 | + self.send(CacheInst::Retrieve { slot, tx_hash, version_hashes, resp }).await; |
| 48 | + |
| 49 | + receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash)) |
| 50 | + } |
| 51 | +} |
| 52 | + |
| 53 | +/// Retrieves blobs and stores them in a cache for later use. |
| 54 | +pub struct BlobCacher<Pool> { |
| 55 | + fetcher: crate::BlobFetcher<Pool>, |
| 56 | + |
| 57 | + cache: Mutex<LruMap<(u64, B256), Blobs>>, |
| 58 | +} |
| 59 | + |
| 60 | +impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> { |
| 61 | + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
| 62 | + f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive() |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +impl<Pool: TransactionPool + 'static> BlobCacher<Pool> { |
| 67 | + /// Creates a new `BlobCacher` with the provided extractor and cache size. |
| 68 | + pub fn new(fetcher: crate::BlobFetcher<Pool>) -> Self { |
| 69 | + Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() } |
| 70 | + } |
| 71 | + |
| 72 | + /// Fetches blobs for a given slot and transaction hash. |
| 73 | + #[instrument(skip(self), target = "signet_blobber::BlobCacher", fields(retries = FETCH_RETRIES))] |
| 74 | + async fn fetch_blobs( |
| 75 | + &self, |
| 76 | + slot: u64, |
| 77 | + tx_hash: B256, |
| 78 | + versioned_hashes: Vec<B256>, |
| 79 | + ) -> FetchResult<Blobs> { |
| 80 | + // Cache hit |
| 81 | + if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) { |
| 82 | + info!(target: "signet_blobber::BlobCacher", "Cache hit"); |
| 83 | + return Ok(blobs.clone()); |
| 84 | + } |
| 85 | + |
| 86 | + // Cache miss, use the fetcher to retrieve blobs |
| 87 | + // Retry fetching blobs up to `FETCH_RETRIES` times |
| 88 | + for attempt in 1..=FETCH_RETRIES { |
| 89 | + let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await; |
| 90 | + |
| 91 | + match blobs { |
| 92 | + Ok(blobs) => { |
| 93 | + self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone()); |
| 94 | + return Ok(blobs); |
| 95 | + } |
| 96 | + Err(BlobFetcherError::Ignorable(e)) => { |
| 97 | + warn!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed."); |
| 98 | + tokio::time::sleep(BETWEEN_RETRIES).await; |
| 99 | + continue; |
| 100 | + } |
| 101 | + Err(e) => return Err(e), // unrecoverable error |
| 102 | + } |
| 103 | + } |
| 104 | + error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed"); |
| 105 | + Err(BlobFetcherError::missing_sidecar(tx_hash)) |
| 106 | + } |
| 107 | + |
| 108 | + /// Processes the cache instructions. |
| 109 | + async fn handle_inst(self: Arc<Self>, inst: CacheInst) { |
| 110 | + match inst { |
| 111 | + CacheInst::Retrieve { slot, tx_hash, version_hashes, resp } => { |
| 112 | + if let Ok(blobs) = self.fetch_blobs(slot, tx_hash, version_hashes).await { |
| 113 | + // if listener has gone away, that's okay, we just won't send the response |
| 114 | + let _ = resp.send(blobs); |
| 115 | + } |
| 116 | + } |
| 117 | + } |
| 118 | + } |
| 119 | + |
| 120 | + async fn task_future(self: Arc<Self>, mut inst: mpsc::Receiver<CacheInst>) { |
| 121 | + while let Some(inst) = inst.recv().await { |
| 122 | + let this = Arc::clone(&self); |
| 123 | + tokio::spawn(async move { |
| 124 | + this.handle_inst(inst).await; |
| 125 | + }); |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + /// Spawns the cache task to handle incoming instructions. |
| 130 | + /// |
| 131 | + /// # Panics |
| 132 | + /// This function will panic if the cache task fails to spawn. |
| 133 | + pub fn spawn(self) -> CacheHandle { |
| 134 | + let (sender, inst) = mpsc::channel(12); |
| 135 | + tokio::spawn(Arc::new(self).task_future(inst)); |
| 136 | + CacheHandle { sender } |
| 137 | + } |
| 138 | +} |
0 commit comments