Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: various updates to the tx poller #67

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions src/tasks/tx_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use eyre::Error;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::from_slice;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, trace};

/// Models a response from the transaction pool.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -46,22 +47,48 @@ impl TxPoller {
Ok(response.transactions)
}

/// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
pub fn spawn(mut self) -> (mpsc::UnboundedReceiver<TxEnvelope>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(async move {
loop {
if let Ok(transactions) = self.check_tx_cache().await {
tracing::debug!(count = ?transactions.len(), "found transactions");
async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
loop {
let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url);

// Enter the span for the next check.
let _guard = span.enter();

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
break;
}
// exit the span after the check.
drop(_guard);

match self.check_tx_cache().instrument(span.clone()).await {
Ok(transactions) => {
let _guard = span.entered();
debug!(count = ?transactions.len(), "found transactions");
for tx in transactions.into_iter() {
if let Err(err) = outbound.send(tx) {
tracing::error!(err = ?err, "failed to send transaction - channel is dropped.");
if outbound.send(tx).is_err() {
// If there are no receivers, we can shut down
trace!("No receivers left, shutting down");
break;
}
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await;
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching transactions");
}
}
});
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
}
}

/// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<TxEnvelope>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));
(inbound, jh)
}
}
Loading