Skip to content

Commit 14c9300

Browse files
committed
refactors
- improves poll interval handling - cleans up and updates comments - removes arc usage
1 parent 0bb1798 commit 14c9300

File tree

1 file changed

+29
-27
lines changed

1 file changed

+29
-27
lines changed

src/tasks/bundler.rs

+29-27
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
1-
//! Bundler service responsible for managing bundles.
2-
use std::sync::Arc;
3-
4-
use super::oauth::Authenticator;
5-
1+
//! Bundler service responsible for fetching bundles and sending them to the simulator.
62
pub use crate::config::BuilderConfig;
7-
3+
use crate::tasks::oauth::Authenticator;
84
use oauth2::TokenResponse;
95
use reqwest::Url;
106
use serde::{Deserialize, Serialize};
117
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
128
use tokio::task::JoinHandle;
139
use zenith_types::ZenithEthBundle;
1410

15-
/// Holds a Signet bundle from the cache that has a unique identifier
16-
/// and a Zenith bundle
11+
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
1712
#[derive(Debug, Clone, Serialize, Deserialize)]
1813
pub struct Bundle {
1914
/// Cache identifier for the bundle
@@ -22,38 +17,37 @@ pub struct Bundle {
2217
pub bundle: ZenithEthBundle,
2318
}
2419

25-
impl PartialEq for Bundle {
26-
fn eq(&self, other: &Self) -> bool {
27-
self.id == other.id
28-
}
29-
}
30-
31-
impl Eq for Bundle {}
32-
3320
/// Response from the tx-pool containing a list of bundles.
3421
#[derive(Debug, Clone, Serialize, Deserialize)]
3522
pub struct TxPoolBundleResponse {
36-
/// Bundle responses are availabel on the bundles property
23+
/// Bundle responses are available on the bundles property.
3724
pub bundles: Vec<Bundle>,
3825
}
3926

40-
/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
27+
/// The BundlePoller polls the tx-pool for bundles.
4128
#[derive(Debug, Clone)]
4229
pub struct BundlePoller {
4330
/// The builder configuration values.
4431
pub config: BuilderConfig,
4532
/// Authentication module that periodically fetches and stores auth tokens.
4633
pub authenticator: Authenticator,
34+
/// Defines the interval at which the bundler polls the tx-pool for bundles.
35+
pub poll_interval_ms: u64,
4736
}
4837

49-
/// Implements a poller for the block builder to pull bundles from the tx cache.
38+
/// Implements a poller for the block builder to pull bundles from the tx-pool.
5039
impl BundlePoller {
5140
/// Creates a new BundlePoller from the provided builder config.
5241
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
53-
Self { config: config.clone(), authenticator }
42+
Self { config: config.clone(), authenticator, poll_interval_ms: 1000 }
5443
}
5544

56-
/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
45+
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
46+
pub fn new_with_poll_interval_ms(config: &BuilderConfig, authenticator: Authenticator) -> Self {
47+
Self { config: config.clone(), authenticator, poll_interval_ms: 1000 }
48+
}
49+
50+
/// Fetches bundles from the transaction cache and returns them.
5751
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
5852
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
5953
let token = self.authenticator.fetch_oauth_token().await?;
@@ -71,23 +65,31 @@ impl BundlePoller {
7165
Ok(resp.bundles)
7266
}
7367

74-
/// Spawns a task that simply sends out any bundles it ever finds
75-
pub fn spawn(mut self) -> (UnboundedReceiver<Arc<Bundle>>, JoinHandle<()>) {
68+
/// Spawns a task that sends bundles it finds to its channel sender.
69+
pub fn spawn(mut self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
7670
let (outbound, inbound) = unbounded_channel();
7771
let jh = tokio::spawn(async move {
7872
loop {
7973
if let Ok(bundles) = self.check_bundle_cache().await {
8074
tracing::debug!(count = ?bundles.len(), "found bundles");
81-
for bundle in bundles.iter() {
82-
if let Err(err) = outbound.send(Arc::new(bundle.clone())) {
83-
tracing::error!(err = ?err, "Failed to send bundle");
75+
for bundle in bundles.into_iter() {
76+
if let Err(err) = outbound.send(bundle) {
77+
tracing::error!(err = ?err, "Failed to send bundle - channel full");
8478
}
8579
}
8680
}
87-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
81+
tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await;
8882
}
8983
});
9084

9185
(inbound, jh)
9286
}
9387
}
88+
89+
impl PartialEq for Bundle {
90+
fn eq(&self, other: &Self) -> bool {
91+
self.id == other.id
92+
}
93+
}
94+
95+
impl Eq for Bundle {}

0 commit comments

Comments
 (0)