Skip to content

Commit

Permalink
Fix broken bridge stats reporting & add filter-clients flag (ethereum…
Browse files Browse the repository at this point in the history
…#1493)

* fix: buggy state bridge global offer report & add filter-clients flag
  • Loading branch information
njgheorghita authored Sep 27, 2024
1 parent 9e71e5c commit 3142c5c
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 55 deletions.
101 changes: 63 additions & 38 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct StateBridge {
// Used to request all interested enrs in the network from census process.
census_tx: mpsc::UnboundedSender<EnrsRequest>,
// Global offer report for tallying total performance of state bridge
offer_report: Arc<Mutex<OfferReport>>,
global_offer_report: Arc<Mutex<GlobalOfferReport>>,
}

impl StateBridge {
Expand All @@ -58,14 +58,14 @@ impl StateBridge {
) -> anyhow::Result<Self> {
let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}"));
let offer_semaphore = Arc::new(Semaphore::new(offer_limit));
let offer_report = OfferReport::new(None, usize::MAX);
let global_offer_report = GlobalOfferReport::default();
Ok(Self {
mode,
portal_client,
metrics,
offer_semaphore,
census_tx,
offer_report: Arc::new(Mutex::new(offer_report)),
global_offer_report: Arc::new(Mutex::new(global_offer_report)),
})
}

Expand Down Expand Up @@ -101,7 +101,10 @@ impl StateBridge {
for block_number in 0..=last_block {
info!("Gossipping state for block at height: {block_number}");
// display global state offer report
self.offer_report.lock().expect("to acquire lock").report();
self.global_offer_report
.lock()
.expect("to acquire lock")
.report();

// process block
let RootWithTrieDiff {
Expand Down Expand Up @@ -184,6 +187,11 @@ impl StateBridge {
// This is used for gossiping storage trie diffs
trin_execution.database.storage_cache.clear();
}
// display final global state offer report
self.global_offer_report
.lock()
.expect("to acquire lock")
.report();
temp_directory.close()?;
Ok(())
}
Expand Down Expand Up @@ -248,7 +256,7 @@ impl StateBridge {
return;
};
let offer_report = Arc::new(Mutex::new(OfferReport::new(
Some(content_key.clone()),
content_key.clone(),
enrs.len(),
)));
for enr in enrs.clone() {
Expand All @@ -258,7 +266,7 @@ impl StateBridge {
let content_value = content_value.clone();
let offer_report = offer_report.clone();
let metrics = self.metrics.clone();
let global_offer_report = self.offer_report.clone();
let global_offer_report = self.global_offer_report.clone();
tokio::spawn(async move {
let timer = metrics.start_process_timer("spawn_offer_state_proof");
match timeout(
Expand All @@ -277,7 +285,7 @@ impl StateBridge {
global_offer_report
.lock()
.expect("to acquire lock")
.update(&enr, &result);
.update(&result);
offer_report
.lock()
.expect("to acquire lock")
Expand All @@ -287,7 +295,7 @@ impl StateBridge {
global_offer_report
.lock()
.expect("to acquire lock")
.update(&enr, &OfferTrace::Failed);
.update(&OfferTrace::Failed);
offer_report
.lock()
.expect("to acquire lock")
Expand All @@ -299,7 +307,7 @@ impl StateBridge {
global_offer_report
.lock()
.expect("to acquire lock")
.update(&enr, &OfferTrace::Failed);
.update(&OfferTrace::Failed);
offer_report
.lock()
.expect("to acquire lock")
Expand All @@ -322,18 +330,53 @@ impl StateBridge {
}
}

// Individual report for outcomes of offering a state content key
/// Global report for outcomes of offering state content keys from long-running state bridge
#[derive(Default)]
struct GlobalOfferReport {
success: usize,
failed: usize,
declined: usize,
}

impl GlobalOfferReport {
fn update(&mut self, trace: &OfferTrace) {
match trace {
OfferTrace::Success(_) => self.success += 1,
OfferTrace::Failed => self.failed += 1,
OfferTrace::Declined => self.declined += 1,
}
}

fn report(&self) {
let total = self.success + self.failed + self.declined;
if total == 0 {
return;
}
info!(
"State offer report: Total Offers: {}. Successful: {}% ({}). Declined: {}% ({}). Failed: {}% ({}).",
total,
100 * self.success / total,
self.success,
100 * self.declined / total,
self.declined,
100 * self.failed / total,
self.failed,
);
}
}

/// Individual report for outcomes of offering a state content key
struct OfferReport {
// content key as None is reserved for the global state offer report
content_key: Option<StateContentKey>,
content_key: StateContentKey,
/// total number of enrs interested in the content key
total: usize,
success: Vec<Enr>,
failed: Vec<Enr>,
declined: Vec<Enr>,
}

impl OfferReport {
fn new(content_key: Option<StateContentKey>, total: usize) -> Self {
fn new(content_key: StateContentKey, total: usize) -> Self {
Self {
content_key,
total,
Expand All @@ -359,38 +402,20 @@ impl OfferReport {

fn report(&self) {
if enabled!(Level::DEBUG) {
if let Some(content_key) = &self.content_key {
debug!(
"Successfully offered to {}/{} peers. Content key: {}. Declined: {:?}. Failed: {:?}",
self.success.len(),
self.total,
content_key.to_hex(),
self.declined,
self.failed,
);
} else {
debug!(
"State offer report: {}/{} peers. Declined: {:?}. Failed: {:?}",
self.success.len(),
self.total,
self.declined,
self.failed,
);
}
} else if let Some(content_key) = &self.content_key {
info!(
"Successfully offered to {}/{} peers. Content key: {}. Declined: {}. Failed: {}.",
debug!(
"Successfully offered to {}/{} peers. Content key: {}. Declined: {:?}. Failed: {:?}",
self.success.len(),
self.total,
content_key.to_hex(),
self.declined.len(),
self.failed.len(),
self.content_key.to_hex(),
self.declined,
self.failed,
);
} else {
info!(
"State offer report: {}/{} peers. Declined: {}. Failed: {}.",
"Successfully offered to {}/{} peers. Content key: {}. Declined: {}. Failed: {}.",
self.success.len(),
self.total,
self.content_key.to_hex(),
self.declined.len(),
self.failed.len(),
);
Expand Down
37 changes: 29 additions & 8 deletions portal-bridge/src/census.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::{
};
use tracing::{error, info, warn};

use crate::cli::{BridgeConfig, ClientType};
use ethportal_api::{
generate_random_remote_enr,
jsonrpsee::http_client::HttpClient,
Expand Down Expand Up @@ -37,7 +38,7 @@ const LIVENESS_CHECK_DELAY: Duration = Duration::from_secs(3600);
/// The maximum number of enrs to return in a response,
/// limiting the number of OFFER requests spawned by the bridge
/// for each piece of content
const ENRS_RESPONSE_LIMIT: usize = 4;
pub const ENR_OFFER_LIMIT: usize = 4;

/// The census is responsible for maintaining a list of known peers in the network,
/// checking their liveness, updating their data radius, iterating through their
Expand All @@ -50,11 +51,15 @@ pub struct Census {
}

impl Census {
pub fn new(client: HttpClient, census_rx: mpsc::UnboundedReceiver<EnrsRequest>) -> Self {
pub fn new(
client: HttpClient,
census_rx: mpsc::UnboundedReceiver<EnrsRequest>,
bridge_config: &BridgeConfig,
) -> Self {
Self {
history: Network::new(client.clone(), Subnetwork::History),
state: Network::new(client.clone(), Subnetwork::State),
beacon: Network::new(client.clone(), Subnetwork::Beacon),
history: Network::new(client.clone(), Subnetwork::History, bridge_config),
state: Network::new(client.clone(), Subnetwork::State, bridge_config),
beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config),
census_rx,
}
}
Expand All @@ -64,7 +69,6 @@ impl Census {
pub async fn init(&mut self) -> Result<(), CensusError> {
// currently, the census is only initialized for the state network
// only initialized networks will yield inside `run()` loop
info!("Initializing state network census");
self.state.init().await;
if self.state.peers.is_empty() {
return Err(CensusError::FailedInitialization);
Expand Down Expand Up @@ -142,14 +146,18 @@ struct Network {
peers: HashMapDelay<[u8; 32], (Enr, Distance)>,
client: HttpClient,
subnetwork: Subnetwork,
filter_clients: Vec<ClientType>,
enr_offer_limit: usize,
}

impl Network {
fn new(client: HttpClient, subnetwork: Subnetwork) -> Self {
fn new(client: HttpClient, subnetwork: Subnetwork, bridge_config: &BridgeConfig) -> Self {
Self {
peers: HashMapDelay::new(LIVENESS_CHECK_DELAY),
client,
subnetwork,
filter_clients: bridge_config.filter_clients.to_vec(),
enr_offer_limit: bridge_config.enr_offer_limit,
}
}

Expand All @@ -161,6 +169,13 @@ impl Network {
// peers. However, since the census continues to iterate through the peers after initialization,
// the initialization is just to reach a critical mass of peers so that gossip can begin.
async fn init(&mut self) {
match self.filter_clients.is_empty() {
true => info!("Initializing {} network census", self.subnetwork),
false => info!(
"Initializing {} network census with filtered clients: {:?}",
self.subnetwork, self.filter_clients
),
}
let (_, random_enr) = generate_random_remote_enr();
let Ok(initial_enrs) = self
.subnetwork
Expand Down Expand Up @@ -216,6 +231,12 @@ impl Network {
// since the same enr might appear multiple times between the
// routing tables of different peers.
async fn liveness_check(&mut self, enr: Enr) -> bool {
// skip if client type is filtered
let client_type = ClientType::from(&enr);
if self.filter_clients.contains(&client_type) {
return false;
}

// if enr is already registered, check if delay map deadline has expired
if let Some(deadline) = self.peers.deadline(&enr.node_id().raw()) {
if Instant::now() < deadline {
Expand Down Expand Up @@ -256,7 +277,7 @@ impl Network {
None
}
})
.take(ENRS_RESPONSE_LIMIT)
.take(self.enr_offer_limit)
.collect())
}
}
Expand Down
Loading

0 comments on commit 3142c5c

Please sign in to comment.