Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ mod tests {
.unwrap(),
),
rpc_url,
rpc_urls: Vec::new(),
private_key: Some(private_key),
bento_api_url: None,
bonsai_api_key,
Expand Down
4 changes: 3 additions & 1 deletion crates/boundless-market/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ url = { workspace = true }
# Host dependencies
[target.'cfg(not(target_os = "zkvm"))'.dependencies]
derive_builder = "0.20.2"
alloy = { workspace = true, features = ["network", "node-bindings", "rpc-types", "providers", "transports", "sol-types", "contract", "signers", "signer-local"] }
alloy = { workspace = true, features = ["network", "node-bindings", "rpc-types", "providers", "transports", "sol-types", "contract", "signers", "signer-local", "rpc-client"] }
alloy-rpc-client = "1.0"
async-stream = { workspace = true }
async-trait = "0.1"
aws-sdk-s3 = "1.34"
Expand All @@ -48,6 +49,7 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tower = "0.5"
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
siwe = { version = "0.6", features = ["serde"] }
Expand Down
160 changes: 133 additions & 27 deletions crates/boundless-market/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use alloy::{
local::{LocalSignerError, PrivateKeySigner},
Signer,
},
transports::{http::Http, layers::FallbackLayer},
};
use alloy_primitives::{Signature, B256};
use alloy_rpc_client::RpcClient;
use anyhow::{anyhow, bail, Context, Result};
use risc0_aggregation::SetInclusionReceipt;
use risc0_ethereum_contracts::set_verifier::SetVerifierService;
use risc0_zkvm::{sha::Digest, ReceiptClaim};
use tower::ServiceBuilder;
use url::Url;

use crate::{
Expand Down Expand Up @@ -57,6 +60,7 @@ use crate::{
pub struct ClientBuilder<St = NotProvided, Si = NotProvided> {
deployment: Option<Deployment>,
rpc_url: Option<Url>,
rpc_urls: Vec<Url>,
signer: Option<Si>,
storage_provider: Option<St>,
tx_timeout: Option<std::time::Duration>,
Expand All @@ -76,6 +80,7 @@ impl<St, Si> Default for ClientBuilder<St, Si> {
Self {
deployment: None,
rpc_url: None,
rpc_urls: Vec::new(),
signer: None,
storage_provider: None,
tx_timeout: None,
Expand All @@ -100,24 +105,64 @@ pub trait ClientProviderBuilder {
/// Error returned by methods on this [ClientProviderBuilder].
type Error;

/// Build a provider connected to the given RPC URL.
/// Build a provider connected to the given RPC URLs.
fn build_provider(
&self,
rpc_url: impl AsRef<str>,
rpc_urls: Vec<Url>,
) -> impl Future<Output = Result<DynProvider, Self::Error>>;

/// Get the default signer address that will be used by this provider, or `None` if no signer.
fn signer_address(&self) -> Option<Address>;
}

impl<St, Si> ClientBuilder<St, Si> {
/// Collect all RPC URLs by merging rpc_url and rpc_urls.
/// If both are provided, they are merged into a single list.
fn collect_rpc_urls(&self) -> Result<Vec<Url>, anyhow::Error> {
let mut all_urls = Vec::new();

// Add the primary RPC URL if set
if let Some(ref rpc_url) = self.rpc_url {
all_urls.push(rpc_url.clone());
}

// Add any additional URLs from rpc_urls
all_urls.extend(self.rpc_urls.clone());

Ok(all_urls)
}

/// Build a custom RPC client transport with fallback support for multiple URLs.
fn build_fallback_transport(&self, urls: &[Url]) -> Result<RpcClient, anyhow::Error> {
// Create HTTP transports for each URL
let transports: Vec<Http<_>> = urls.iter().map(|url| Http::new(url.clone())).collect();

// Configure FallbackLayer with all transports active
let active_count =
std::num::NonZeroUsize::new(transports.len()).unwrap_or(std::num::NonZeroUsize::MIN);
let fallback_layer = FallbackLayer::default().with_active_transport_count(active_count);

tracing::info!(
"Configuring provider with fallback support: {} URLs: {:?}",
urls.len(),
urls
);

// Build transport with fallback layer
let transport = ServiceBuilder::new().layer(fallback_layer).service(transports);

// Create RPC client with the transport
Ok(RpcClient::builder().transport(transport, false))
}
}

impl<St, Si> ClientProviderBuilder for ClientBuilder<St, Si>
where
Si: TxSigner<Signature> + Send + Sync + Clone + 'static,
{
type Error = anyhow::Error;

async fn build_provider(&self, rpc_url: impl AsRef<str>) -> Result<DynProvider, Self::Error> {
let rpc_url = rpc_url.as_ref();
async fn build_provider(&self, rpc_urls: Vec<Url>) -> Result<DynProvider, Self::Error> {
let provider = match self.signer.clone() {
Some(signer) => {
let dynamic_gas_filler = DynamicGasFiller::new(
Expand All @@ -127,22 +172,48 @@ where
signer.address(),
);

// Connect the RPC provider.
let base_provider = ProviderBuilder::new()
.disable_recommended_fillers()
.filler(ChainIdFiller::default())
.filler(dynamic_gas_filler)
.layer(BalanceAlertLayer::new(self.balance_alerts.clone().unwrap_or_default()))
.connect(rpc_url)
.await
.with_context(|| format!("failed to connect provider to {rpc_url}"))?;
// Build provider without erasing first (NonceProvider needs FillProvider)
let balance_alerts = self.balance_alerts.clone().unwrap_or_default();

let base_provider = if rpc_urls.len() > 1 {
// Multiple URLs - use fallback transport
let client = self.build_fallback_transport(&rpc_urls)?;
ProviderBuilder::new()
.disable_recommended_fillers()
.filler(ChainIdFiller::default())
.filler(dynamic_gas_filler)
.layer(BalanceAlertLayer::new(balance_alerts))
.connect_client(client)
} else {
// Single URL - use regular provider
let url = rpc_urls.first().unwrap();
ProviderBuilder::new()
.disable_recommended_fillers()
.filler(ChainIdFiller::default())
.filler(dynamic_gas_filler)
.layer(BalanceAlertLayer::new(balance_alerts))
.connect(url.as_str())
.await
.with_context(|| format!("failed to connect provider to {url}"))?
};

NonceProvider::new(base_provider, EthereumWallet::from(signer)).erased()
}
None => ProviderBuilder::new()
.connect(rpc_url)
.await
.with_context(|| format!("failed to connect provider to {rpc_url}"))?
.erased(),
None => {
if rpc_urls.len() > 1 {
// Multiple URLs - use fallback transport
let client = self.build_fallback_transport(&rpc_urls)?;
ProviderBuilder::new().connect_client(client).erased()
} else {
// Single URL - use regular provider
let url = rpc_urls.first().context("no RPC URL provided")?;
ProviderBuilder::new()
.connect(url.as_str())
.await
.with_context(|| format!("failed to connect provider to {url}"))?
.erased()
}
}
};
Ok(provider)
}
Expand All @@ -155,13 +226,20 @@ where
impl<St> ClientProviderBuilder for ClientBuilder<St, NotProvided> {
type Error = anyhow::Error;

async fn build_provider(&self, rpc_url: impl AsRef<str>) -> Result<DynProvider, Self::Error> {
let rpc_url = rpc_url.as_ref();
let provider = ProviderBuilder::new()
.connect(rpc_url)
.await
.with_context(|| format!("failed to connect provider to {rpc_url}"))?
.erased();
async fn build_provider(&self, rpc_urls: Vec<Url>) -> Result<DynProvider, Self::Error> {
let provider = if rpc_urls.len() > 1 {
// Multiple URLs - use fallback transport
let client = self.build_fallback_transport(&rpc_urls)?;
ProviderBuilder::new().connect_client(client).erased()
} else {
// Single URL - use regular provider
let url = rpc_urls.first().unwrap();
ProviderBuilder::new()
.connect(url.as_str())
.await
.with_context(|| format!("failed to connect provider to {url}"))?
.erased()
};
Ok(provider)
}

Expand All @@ -179,8 +257,8 @@ impl<St, Si> ClientBuilder<St, Si> {
St: Clone,
Self: ClientProviderBuilder<Error = anyhow::Error>,
{
let rpc_url = self.rpc_url.clone().context("rpc_url is not set on ClientBuilder")?;
let provider = self.build_provider(&rpc_url).await?;
let all_urls = self.collect_rpc_urls()?;
let provider = self.build_provider(all_urls).await?;

// Resolve the deployment information.
let chain_id =
Expand Down Expand Up @@ -265,6 +343,32 @@ impl<St, Si> ClientBuilder<St, Si> {
Self { rpc_url: Some(rpc_url), ..self }
}

/// Set additional RPC URLs for automatic failover.
///
/// When multiple URLs are provided (via `with_rpc_url` and/or `with_rpc_urls`),
/// they are merged into a single list. If 2+ URLs are provided, the client will
/// use Alloy's FallbackLayer to distribute requests across multiple RPC endpoints
/// with automatic failover. If only 1 URL is provided, a regular provider is used.
///
/// # Example
/// ```rust
/// # use boundless_market::Client;
/// # use url::Url;
/// // Multiple URLs - uses fallback provider
/// Client::builder()
/// .with_rpc_urls(vec![
/// Url::parse("https://rpc2.example.com").unwrap(),
/// Url::parse("https://rpc3.example.com").unwrap(),
/// ]);
///
/// // Single URL - uses regular provider
/// Client::builder()
/// .with_rpc_urls(vec![Url::parse("https://rpc.example.com").unwrap()]);
/// ```
pub fn with_rpc_urls(self, rpc_urls: Vec<Url>) -> Self {
Self { rpc_urls, ..self }
}

/// Set the signer from the given private key.
/// ```rust
/// # use boundless_market::Client;
Expand Down Expand Up @@ -305,6 +409,7 @@ impl<St, Si> ClientBuilder<St, Si> {
deployment: self.deployment,
storage_provider: self.storage_provider,
rpc_url: self.rpc_url,
rpc_urls: self.rpc_urls,
tx_timeout: self.tx_timeout,
balance_alerts: self.balance_alerts,
offer_layer_config: self.offer_layer_config,
Expand Down Expand Up @@ -336,6 +441,7 @@ impl<St, Si> ClientBuilder<St, Si> {
storage_provider,
deployment: self.deployment,
rpc_url: self.rpc_url,
rpc_urls: self.rpc_urls,
signer: self.signer,
tx_timeout: self.tx_timeout,
balance_alerts: self.balance_alerts,
Expand Down
1 change: 1 addition & 0 deletions crates/broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] }
tokio-util = { workspace = true }
toml = "0.8"
tower = "0.5"
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
url = { workspace = true }
Expand Down
49 changes: 47 additions & 2 deletions crates/broker/src/bin/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use alloy::{
primitives::utils::parse_ether,
providers::{fillers::ChainIdFiller, network::EthereumWallet, ProviderBuilder, WalletProvider},
rpc::client::RpcClient,
transports::layers::RetryBackoffLayer,
transports::{
http::Http,
layers::{FallbackLayer, RetryBackoffLayer},
},
};
use anyhow::{Context, Result};
use boundless_market::{
Expand All @@ -27,6 +30,7 @@ use boundless_market::{
};
use broker::{Args, Broker, Config, CustomRetryPolicy};
use clap::Parser;
use tower::ServiceBuilder;
use tracing_subscriber::fmt::format::FmtSpan;
use url::Url;

Expand Down Expand Up @@ -70,13 +74,54 @@ async fn main() -> Result<()> {

let wallet = EthereumWallet::from(private_key.clone());

// Collect all RPC URLs (merge rpc_url and rpc_urls) and deduplicate
let mut all_rpc_urls = Vec::new();
let mut seen = std::collections::HashSet::new();

// Add rpc_url first if not already seen
if seen.insert(args.rpc_url.clone()) {
all_rpc_urls.push(args.rpc_url.clone());
}

// Add rpc_urls, skipping duplicates
for url in &args.rpc_urls {
if seen.insert(url.clone()) {
all_rpc_urls.push(url.clone());
}
}

let retry_layer = RetryBackoffLayer::new_with_policy(
args.rpc_retry_max,
args.rpc_retry_backoff,
args.rpc_retry_cu,
CustomRetryPolicy,
);
let client = RpcClient::builder().layer(retry_layer).http(args.rpc_url.clone());

// Build RPC client with fallback support if multiple URLs are provided
let client = if all_rpc_urls.len() > 1 {
// Multiple URLs - use fallback transport
let transports: Vec<Http<_>> =
all_rpc_urls.iter().map(|url| Http::new(url.clone())).collect();

let active_count =
std::num::NonZeroUsize::new(transports.len()).unwrap_or(std::num::NonZeroUsize::MIN);
let fallback_layer = FallbackLayer::default().with_active_transport_count(active_count);

tracing::info!(
"Configuring broker with fallback RPC support: {} URLs: {:?}",
all_rpc_urls.len(),
all_rpc_urls
);

let transport =
ServiceBuilder::new().layer(retry_layer).layer(fallback_layer).service(transports);

RpcClient::builder().transport(transport, false)
} else {
// Single URL - use regular provider
tracing::info!("Configuring broker with single RPC URL: {}", args.rpc_url);
RpcClient::builder().layer(retry_layer).http(args.rpc_url.clone())
};
let balance_alerts_layer = BalanceAlertLayer::new(BalanceAlertConfig {
watch_address: wallet.default_signer().address(),
warn_threshold: config
Expand Down
Loading
Loading