Skip to content

Commit feae877

Browse files
authored
thing: thing thing the thing (#205)
* thing: thing thing the thing * refactor: remove waiting span * chore; builder init span * refactor: use new cache and quiet errors
1 parent 2dffd30 commit feae877

File tree

9 files changed

+418
-508
lines changed

9 files changed

+418
-508
lines changed

Cargo.lock

Lines changed: 330 additions & 389 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
5656
tokio-stream = "0.1.17"
5757
url = "2.5.4"
5858
thiserror = "2.0.17"
59+
futures-util = "0.3.31"
5960

6061
[dev-dependencies]
6162
alloy-hardforks = "0.4.0"

bin/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use tokio::select;
1313
#[tokio::main(flavor = "multi_thread")]
1414
async fn main() -> eyre::Result<()> {
1515
let _guard = init4_bin_base::init4();
16-
let init_span_guard = info_span!("builder initialization");
16+
let init_span_guard = info_span!("builder initialization").entered();
17+
1718
builder::config_from_env();
1819

1920
// Set up env and metrics tasks

src/quincey.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,23 @@ pub enum QuinceyError {
2525

2626
/// Error contacting the remote quincey API.
2727
#[error("Error contacting quincey API: {0}")]
28-
Remote(#[from] reqwest::Error),
28+
Remote(reqwest::Error),
2929

3030
/// Error with the owned signet.
3131
#[error("Error with owned signet: {0}")]
3232
Owned(#[from] eyre::Report),
3333
}
3434

35+
impl From<reqwest::Error> for QuinceyError {
36+
fn from(err: reqwest::Error) -> Self {
37+
if err.status() == Some(reqwest::StatusCode::FORBIDDEN) {
38+
QuinceyError::NotOurSlot
39+
} else {
40+
QuinceyError::Remote(err)
41+
}
42+
}
43+
}
44+
3545
/// A quincey client for making requests to the Quincey API.
3646
#[derive(Debug, Clone)]
3747
pub enum Quincey {
@@ -89,23 +99,9 @@ impl Quincey {
8999

90100
let token = token.secret().await?;
91101

92-
let resp = client
93-
.post(url.clone())
94-
.json(sig_request)
95-
.bearer_auth(token)
96-
.send()
97-
.await
98-
.map_err(QuinceyError::Remote)?;
99-
100-
if resp.status() == reqwest::StatusCode::FORBIDDEN {
101-
return Err(QuinceyError::NotOurSlot);
102-
}
102+
let resp = client.post(url.clone()).json(sig_request).bearer_auth(token).send().await?;
103103

104-
resp.error_for_status()
105-
.map_err(QuinceyError::Remote)?
106-
.json::<SignResponse>()
107-
.await
108-
.map_err(QuinceyError::Remote)
104+
resp.error_for_status()?.json::<SignResponse>().await.map_err(QuinceyError::Remote)
109105
}
110106

111107
/// Get a signature for the provided request, by either using the owned

src/tasks/cache/bundle.rs

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
//! Bundler service responsible for fetching bundles and sending them to the simulator.
22
use crate::config::BuilderConfig;
3-
use init4_bin_base::perms::SharedToken;
4-
use reqwest::{Client, Url};
5-
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
3+
use init4_bin_base::perms::tx_cache::BuilderTxCache;
4+
use signet_tx_cache::{TxCacheError, types::TxCacheBundle};
65
use tokio::{
76
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
87
task::JoinHandle,
98
time::{self, Duration},
109
};
11-
use tracing::{Instrument, debug, error, trace, trace_span};
10+
use tracing::{Instrument, error, trace, trace_span};
1211

1312
/// Poll interval for the bundle poller in milliseconds.
1413
const POLL_INTERVAL_MS: u64 = 1000;
@@ -18,10 +17,10 @@ const POLL_INTERVAL_MS: u64 = 1000;
1817
pub struct BundlePoller {
1918
/// The builder configuration values.
2019
config: &'static BuilderConfig,
21-
/// Authentication module that periodically fetches and stores auth tokens.
22-
token: SharedToken,
23-
/// Holds a Reqwest client
24-
client: Client,
20+
21+
/// Client for the tx cache.
22+
tx_cache: BuilderTxCache,
23+
2524
/// Defines the interval at which the bundler polls the tx-pool for bundles.
2625
poll_interval_ms: u64,
2726
}
@@ -42,34 +41,37 @@ impl BundlePoller {
4241
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
4342
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
4443
let config = crate::config();
45-
let token = config.oauth_token();
46-
Self { config, token, client: Client::new(), poll_interval_ms }
47-
}
48-
49-
/// Fetches bundles from the transaction cache and returns them.
50-
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
51-
let bundle_url: Url = self.config.tx_pool_url.join("bundles")?;
52-
let token =
53-
self.token.secret().await.map_err(|e| eyre::eyre!("Failed to read token: {e}"))?;
54-
55-
self.client
56-
.get(bundle_url)
57-
.bearer_auth(token)
58-
.send()
59-
.await?
60-
.error_for_status()?
61-
.json()
62-
.await
63-
.map(|resp: TxCacheBundlesResponse| resp.bundles)
64-
.map_err(Into::into)
44+
let cache = signet_tx_cache::TxCache::new(config.tx_pool_url.clone());
45+
let tx_cache = BuilderTxCache::new(cache, config.oauth_token());
46+
Self { config, tx_cache, poll_interval_ms }
6547
}
6648

6749
/// Returns the poll duration as a [`Duration`].
6850
const fn poll_duration(&self) -> Duration {
6951
Duration::from_millis(self.poll_interval_ms)
7052
}
7153

72-
async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
54+
/// Checks the bundle cache for new bundles.
55+
pub async fn check_bundle_cache(&self) -> Result<Vec<TxCacheBundle>, TxCacheError> {
56+
let res = self.tx_cache.get_bundles().await;
57+
58+
match res {
59+
Ok(bundles) => {
60+
trace!(count = ?bundles.len(), "found bundles");
61+
Ok(bundles)
62+
}
63+
Err(TxCacheError::NotOurSlot) => {
64+
trace!("Not our slot to fetch bundles");
65+
Err(TxCacheError::NotOurSlot)
66+
}
67+
Err(err) => {
68+
error!(?err, "Failed to fetch bundles from tx-cache");
69+
Err(err)
70+
}
71+
}
72+
}
73+
74+
async fn task_future(self, outbound: UnboundedSender<TxCacheBundle>) {
7375
loop {
7476
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
7577

@@ -85,17 +87,10 @@ impl BundlePoller {
8587
// exit the span after the check.
8688
drop(_guard);
8789

88-
if let Ok(bundles) = self
89-
.check_bundle_cache()
90-
.instrument(span.clone())
91-
.await
92-
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
93-
{
94-
let _guard = span.entered();
95-
trace!(count = ?bundles.len(), "found bundles");
90+
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
9691
for bundle in bundles.into_iter() {
9792
if let Err(err) = outbound.send(bundle) {
98-
error!(err = ?err, "Failed to send bundle - channel is dropped");
93+
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
9994
break;
10095
}
10196
}

src/tasks/env.rs

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -276,26 +276,7 @@ impl EnvTask {
276276

277277
drop(span);
278278

279-
// This span will be updated at the end of each loop iteration.
280-
let mut span = info_span!(
281-
parent: None,
282-
"SimEnv",
283-
confirmed.host.number = "initial",
284-
confirmed.ru.number = "initial",
285-
confirmed.ru.hash = "initial",
286-
confirmed.timestamp = 0,
287-
confirmed.slot = 0,
288-
sim.host.number = "initial",
289-
sim.ru.number = "initial",
290-
sim.slot = 0,
291-
trace_id = tracing::field::Empty,
292-
);
293-
294-
while let Some(rollup_header) = rollup_headers
295-
.next()
296-
.instrument(info_span!(parent: &span, "waiting_for_notification"))
297-
.await
298-
{
279+
while let Some(rollup_header) = rollup_headers.next().await {
299280
let host_block_number =
300281
self.config.constants.rollup_block_to_host_block_num(rollup_header.number);
301282
let rollup_block_number = rollup_header.number;
@@ -306,17 +287,23 @@ impl EnvTask {
306287
.expect("valid timestamp");
307288
let sim_slot = self.config.slot_calculator.current_slot().expect("chain has started");
308289

309-
// Populate span fields.
290+
// Create a `BlockConstruction` span
291+
let span = info_span!(
292+
parent: None,
293+
"BlockConstruction",
294+
confirmed.host.number = host_block_number,
295+
confirmed.host.hash = tracing::field::Empty,
296+
confirmed.ru.number = rollup_block_number,
297+
confirmed.ru.hash = %rollup_header.hash,
298+
confirmed.timestamp = rollup_header.timestamp,
299+
confirmed.slot = confirmed_slot,
300+
sim.host.number = host_block_number + 1,
301+
sim.ru.number = rollup_block_number + 1,
302+
sim.slot = sim_slot,
303+
trace_id = tracing::field::Empty,
304+
);
310305
// Ensure that we record the OpenTelemetry trace ID in the span.
311306
span.record("trace_id", span.context().span().span_context().trace_id().to_string());
312-
span.record("confirmed.host.number", host_block_number);
313-
span.record("confirmed.ru.number", rollup_block_number);
314-
span.record("confirmed.ru.hash", rollup_header.hash.to_string());
315-
span.record("confirmed.timestamp", rollup_header.timestamp);
316-
span.record("confirmed.slot", confirmed_slot);
317-
span.record("sim.slot", sim_slot);
318-
span.record("sim.host.number", host_block_number + 1);
319-
span.record("sim.ru.number", rollup_block_number + 1);
320307

321308
let (host_block_res, quincey_res) = tokio::join!(
322309
self.host_provider
@@ -334,15 +321,15 @@ impl EnvTask {
334321
Err(QuinceyError::NotOurSlot) => {
335322
span_debug!(
336323
span,
337-
"not our slot according to quincey - skipping block submission"
324+
"not our slot according to quincey - skipping block construction"
338325
);
339326
continue;
340327
}
341328
Err(err) => {
342329
span_error!(
343330
span,
344331
%err,
345-
"error during quincey preflight check - skipping block submission"
332+
"error during quincey preflight check - skipping block construction"
346333
);
347334
continue;
348335
}
@@ -352,16 +339,17 @@ impl EnvTask {
352339
let host_block_opt = res_unwrap_or_continue!(
353340
host_block_res,
354341
span,
355-
error!("error fetching previous host block - skipping block submission")
342+
error!("error fetching previous host block - skipping block construction")
356343
);
357344

358345
let host_header = opt_unwrap_or_continue!(
359346
host_block_opt,
360347
span,
361-
warn!("previous host block not found - skipping block submission")
348+
warn!("previous host block not found - skipping block construction")
362349
)
363-
.header
364-
.inner;
350+
.header;
351+
352+
span.record("confirmed.host.hash", host_header.hash.to_string());
365353

366354
if rollup_header.timestamp != host_header.timestamp {
367355
span_warn!(
@@ -375,7 +363,7 @@ impl EnvTask {
375363

376364
// Construct the block env using the previous block header
377365
let rollup_env = self.construct_rollup_env(rollup_header.into());
378-
let host_env = self.construct_host_env(host_header);
366+
let host_env = self.construct_host_env(host_header.inner);
379367

380368
span_info!(
381369
span,
@@ -389,20 +377,6 @@ impl EnvTask {
389377
tracing::debug!("receiver dropped, stopping task");
390378
break;
391379
}
392-
393-
// Create a new span for the next iteration.
394-
span = info_span!(
395-
"SimEnv",
396-
confirmed.host.number = host_block_number + 1,
397-
confirmed.ru.number = rollup_block_number + 1,
398-
confirmed.ru.hash = tracing::field::Empty,
399-
confirmed.timestamp = tracing::field::Empty,
400-
confirmed.slot = tracing::field::Empty,
401-
sim.host.number = host_block_number + 2,
402-
sim.ru.number = rollup_block_number + 2,
403-
sim.slot = tracing::field::Empty,
404-
trace_id = tracing::field::Empty,
405-
);
406380
}
407381
}
408382

src/tasks/submit/flashbots.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ impl FlashbotsTask {
112112
);
113113

114114
let tx = prep.prep_transaction(sim_result.prev_host()).await?;
115+
115116
let sendable = self
116117
.host_provider()
117118
.fill(tx.into_request())

src/tasks/submit/prep.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use alloy::{
1111
rpc::types::TransactionRequest,
1212
sol_types::SolCall,
1313
};
14+
use futures_util::FutureExt;
1415
use init4_bin_base::deps::metrics::counter;
1516
use signet_sim::BuiltBlock;
1617
use signet_types::{SignRequest, SignResponse};
@@ -103,14 +104,13 @@ impl<'a> SubmitPrep<'a> {
103104
}
104105

105106
/// Encodes the rollup block into a sidecar.
107+
#[instrument(skip(self), level = "debug")]
106108
async fn build_sidecar(&self) -> eyre::Result<BlobTransactionSidecar> {
107-
let sidecar = self.block.encode_blob::<SimpleCoder>().build()?;
108-
109-
Ok(sidecar)
109+
self.block.encode_blob::<SimpleCoder>().build().map_err(Into::into)
110110
}
111111

112112
/// Build a signature and header input for the host chain transaction.
113-
async fn build_input(&self) -> eyre::Result<Vec<u8>> {
113+
async fn build_input(&self) -> eyre::Result<Bytes> {
114114
let (v, r, s) = self.quincey_signature().await?;
115115

116116
let header = Zenith::BlockHeader {
@@ -120,19 +120,21 @@ impl<'a> SubmitPrep<'a> {
120120
rewardAddress: self.sig_request().ru_reward_address,
121121
blockDataHash: *self.block.contents_hash(),
122122
};
123-
debug!(?header.hostBlockNumber, "built zenith block header");
124-
125-
let data = Zenith::submitBlockCall { header, v, r, s, _4: Bytes::new() }.abi_encode();
123+
let call = Zenith::submitBlockCall { header, v, r, s, _4: Bytes::new() };
126124

127-
Ok(data)
125+
Ok(call.abi_encode().into())
128126
}
129127

130128
/// Create a new transaction request for the host chain.
131129
async fn new_tx_request(&self) -> eyre::Result<TransactionRequest> {
132-
let nonce =
133-
self.provider.get_transaction_count(self.provider.default_signer_address()).await?;
130+
let nonce_fut = self
131+
.provider
132+
.get_transaction_count(self.provider.default_signer_address())
133+
.into_future()
134+
.map(|res| res.map_err(Into::into));
134135

135-
let (sidecar, input) = try_join!(self.build_sidecar(), self.build_input())?;
136+
let (nonce, sidecar, input) =
137+
try_join!(nonce_fut, self.build_sidecar(), self.build_input())?;
136138

137139
let tx = TransactionRequest::default()
138140
.with_blob_sidecar(sidecar)
@@ -144,7 +146,6 @@ impl<'a> SubmitPrep<'a> {
144146
}
145147

146148
/// Prepares a transaction for submission to the host chain.
147-
#[instrument(skip_all, level = "debug")]
148149
pub async fn prep_transaction(self, prev_host: &Header) -> eyre::Result<Bumpable> {
149150
let req = self.new_tx_request().in_current_span().await?;
150151
Ok(Bumpable::new(req, prev_host))

tests/bundle_poller_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() -> Result<()> {
77
setup_logging();
88
setup_test_config();
99

10-
let mut bundle_poller = builder::tasks::cache::BundlePoller::new();
10+
let bundle_poller = builder::tasks::cache::BundlePoller::new();
1111

1212
let _ = bundle_poller.check_bundle_cache().await?;
1313

0 commit comments

Comments
 (0)