1- use crate :: { BlobFetcherError , Blobs , FetchResult } ;
1+ use crate :: { BlobberError , BlobberResult , Blobs , FetchResult } ;
22use alloy:: consensus:: { SidecarCoder , SimpleCoder , Transaction as _} ;
33use alloy:: eips:: eip7691:: MAX_BLOBS_PER_BLOCK_ELECTRA ;
44use alloy:: eips:: merge:: EPOCH_SLOTS ;
@@ -13,7 +13,7 @@ use std::{
1313 time:: Duration ,
1414} ;
1515use tokio:: sync:: { mpsc, oneshot} ;
16- use tracing:: { Instrument , debug , error, info, instrument} ;
16+ use tracing:: { Instrument , debug_span , error, info, instrument, trace } ;
1717
1818const BLOB_CACHE_SIZE : u32 = ( MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS ) as u32 ;
1919const CACHE_REQUEST_CHANNEL_SIZE : usize = ( MAX_BLOBS_PER_BLOCK_ELECTRA * 2 ) as usize ;
@@ -54,7 +54,7 @@ impl CacheHandle {
5454 slot : usize ,
5555 tx_hash : B256 ,
5656 version_hashes : Vec < B256 > ,
57- ) -> FetchResult < Blobs > {
57+ ) -> BlobberResult < Blobs > {
5858 let ( resp, receiver) = oneshot:: channel ( ) ;
5959
6060 self . send ( CacheInst :: Retrieve {
@@ -66,7 +66,7 @@ impl CacheHandle {
6666 } )
6767 . await ;
6868
69- receiver. await . map_err ( |_| BlobFetcherError :: missing_sidecar ( tx_hash) )
69+ receiver. await . map_err ( |_| BlobberError :: missing_sidecar ( tx_hash) )
7070 }
7171
7272 /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
@@ -76,24 +76,24 @@ impl CacheHandle {
7676 slot : usize ,
7777 extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
7878 mut coder : C ,
79- ) -> FetchResult < Bytes > {
79+ ) -> BlobberResult < Bytes > {
8080 let tx_hash = extract. tx_hash ( ) ;
8181 let versioned_hashes = extract
8282 . tx
8383 . as_eip4844 ( )
84- . ok_or_else ( BlobFetcherError :: non_4844_transaction) ?
84+ . ok_or_else ( BlobberError :: non_4844_transaction) ?
8585 . blob_versioned_hashes ( )
8686 . expect ( "tx is eip4844" ) ;
8787
8888 let blobs = self . fetch_blobs ( slot, tx_hash, versioned_hashes. to_owned ( ) ) . await ?;
8989
9090 coder
9191 . decode_all ( blobs. as_ref ( ) )
92- . ok_or_else ( BlobFetcherError :: blob_decode_error) ?
92+ . ok_or_else ( BlobberError :: blob_decode_error) ?
9393 . into_iter ( )
9494 . find ( |data| keccak256 ( data) == extract. block_data_hash ( ) )
9595 . map ( Into :: into)
96- . ok_or_else ( || BlobFetcherError :: block_data_not_found ( tx_hash) )
96+ . ok_or_else ( || BlobberError :: block_data_not_found ( tx_hash) )
9797 }
9898
9999 /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
@@ -102,12 +102,21 @@ impl CacheHandle {
102102 & self ,
103103 slot : usize ,
104104 extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
105- ) -> FetchResult < Bytes > {
105+ ) -> BlobberResult < Bytes > {
106106 self . fetch_and_decode_with_coder ( slot, extract, SimpleCoder :: default ( ) ) . await
107107 }
108108
109109 /// Fetch the blobs, decode them using the provided coder, and construct a
110110 /// Zenith block from the header and data.
111+ ///
112+ /// # Returns
113+ ///
114+ /// - `Ok(ZenithBlock)` if the block was successfully fetched and
115+ /// decoded.
116+ /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
117+ /// decoded (e.g., due to a malformatted blob).
118+ /// - `Err(FetchError)` if there was an unrecoverable error fetching the
119+ /// blobs.
111120 pub async fn signet_block_with_coder < C : SidecarCoder > (
112121 & self ,
113122 host_block_number : u64 ,
@@ -116,13 +125,28 @@ impl CacheHandle {
116125 coder : C ,
117126 ) -> FetchResult < ZenithBlock > {
118127 let header = extract. ru_header ( host_block_number) ;
119- self . fetch_and_decode_with_coder ( slot, extract, coder)
120- . await
121- . map ( |buf| ZenithBlock :: from_header_and_data ( header, buf) )
128+ let block_data = match self . fetch_and_decode_with_coder ( slot, extract, coder) . await {
129+ Ok ( buf) => buf,
130+ Err ( BlobberError :: Decode ( _) ) => {
131+ trace ! ( "Failed to decode block data" ) ;
132+ Bytes :: default ( )
133+ }
134+ Err ( BlobberError :: Fetch ( err) ) => return Err ( err) ,
135+ } ;
136+ Ok ( ZenithBlock :: from_header_and_data ( header, block_data) )
122137 }
123138
124139 /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
125140 /// Zenith block from the header and data.
141+ ///
142+ /// # Returns
143+ ///
144+ /// - `Ok(ZenithBlock)` if the block was successfully fetched and
145+ /// decoded.
146+ /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
147+ /// decoded (e.g., due to a malformatted blob).
148+ /// - `Err(FetchError)` if there was an unrecoverable error fetching the
149+ /// blobs.
126150 pub async fn signet_block (
127151 & self ,
128152 host_block_number : u64 ,
@@ -159,7 +183,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
159183 slot : usize ,
160184 tx_hash : B256 ,
161185 versioned_hashes : Vec < B256 > ,
162- ) -> FetchResult < Blobs > {
186+ ) -> BlobberResult < Blobs > {
163187 // Cache hit
164188 if let Some ( blobs) = self . cache . lock ( ) . unwrap ( ) . get ( & ( slot, tx_hash) ) {
165189 info ! ( target: "signet_blobber::BlobCacher" , "Cache hit" ) ;
@@ -169,23 +193,21 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
169193 // Cache miss, use the fetcher to retrieve blobs
170194 // Retry fetching blobs up to `FETCH_RETRIES` times
171195 for attempt in 1 ..=FETCH_RETRIES {
172- let blobs = self . fetcher . fetch_blobs ( slot, tx_hash, & versioned_hashes) . await ;
173-
174- match blobs {
175- Ok ( blobs) => {
176- self . cache . lock ( ) . unwrap ( ) . insert ( ( slot, tx_hash) , blobs. clone ( ) ) ;
177- return Ok ( blobs) ;
178- }
179- Err ( BlobFetcherError :: Ignorable ( e) ) => {
180- debug ! ( target: "signet_blobber::BlobCacher" , attempt, %e, "Blob fetch attempt failed." ) ;
181- tokio:: time:: sleep ( BETWEEN_RETRIES ) . await ;
182- continue ;
183- }
184- Err ( e) => return Err ( e) , // unrecoverable error
185- }
196+ let Ok ( blobs) = self
197+ . fetcher
198+ . fetch_blobs ( slot, tx_hash, & versioned_hashes)
199+ . instrument ( debug_span ! ( "fetch_blobs_loop" , attempt) )
200+ . await
201+ else {
202+ tokio:: time:: sleep ( BETWEEN_RETRIES ) . await ;
203+ continue ;
204+ } ;
205+
206+ self . cache . lock ( ) . unwrap ( ) . insert ( ( slot, tx_hash) , blobs. clone ( ) ) ;
207+ return Ok ( blobs) ;
186208 }
187209 error ! ( target: "signet_blobber::BlobCacher" , "All fetch attempts failed" ) ;
188- Err ( BlobFetcherError :: missing_sidecar ( tx_hash) )
210+ Err ( BlobberError :: missing_sidecar ( tx_hash) )
189211 }
190212
191213 /// Processes the cache instructions.
0 commit comments