diff --git a/Cargo.lock b/Cargo.lock index 6b1830d..f5c2997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2418,6 +2418,7 @@ dependencies = [ "solana-sdk", "tokio", "tokio-stream", + "tracing", "warp", "yellowstone-grpc-client", "yellowstone-grpc-proto", diff --git a/connector/Cargo.toml b/connector/Cargo.toml index 47915d0..b17594f 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -37,6 +37,7 @@ serde = { workspace = true } serde_derive = { workspace = true } log = { workspace = true } +tracing = "0.1.40" anyhow = { workspace = true } smallvec = "1.13.2" diff --git a/connector/src/chain_data.rs b/connector/src/chain_data.rs index 1079bd0..9df2e31 100644 --- a/connector/src/chain_data.rs +++ b/connector/src/chain_data.rs @@ -1,5 +1,5 @@ use crate::chain_data::SlotVectorEffect::*; -use log::{info, trace}; +use log::trace; use smallvec::{smallvec, SmallVec}; use solana_sdk::clock::Slot; use { @@ -12,6 +12,7 @@ use crate::metrics::*; #[derive(Clone, Copy, Debug, PartialEq)] pub enum SlotStatus { + // aka Finalized Rooted, Confirmed, Processed, @@ -82,44 +83,72 @@ impl Default for ChainData { } impl ChainData { - pub fn update_slot(&mut self, new_slot: SlotData) { - let new_processed_head = new_slot.slot > self.newest_processed_slot; + #[tracing::instrument(skip_all, level = "trace")] + pub fn update_slot(&mut self, new_slotdata: SlotData) { + let SlotData { + slot: new_slot, + parent: new_parent, + status: new_status, + .. + } = new_slotdata; + + trace!("update_slot from newslot {:?}", new_slot); + let new_processed_head = new_slot > self.newest_processed_slot; if new_processed_head { - self.newest_processed_slot = new_slot.slot; + self.newest_processed_slot = new_slot; + trace!("use slot {} as newest_processed_slot", new_slot); } let new_rooted_head = - new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted; + new_slot > self.newest_rooted_slot && new_status == SlotStatus::Rooted; if new_rooted_head { - self.newest_rooted_slot = new_slot.slot; + self.newest_rooted_slot = new_slot; + trace!("use slot {} as newest_rooted_slot", new_slot); } // Use the highest slot that has a known parent as best chain // (sometimes slots OptimisticallyConfirm before we even know the parent!) - let new_best_chain = new_slot.parent.is_some() && new_slot.slot > self.best_chain_slot; + let new_best_chain = new_parent.is_some() && new_slot > self.best_chain_slot; if new_best_chain { - self.best_chain_slot = new_slot.slot; + self.best_chain_slot = new_slot; + trace!("use slot {} as best_chain_slot", new_slot); } let mut parent_update = false; use std::collections::hash_map::Entry; - match self.slots.entry(new_slot.slot) { + match self.slots.entry(new_slot) { Entry::Vacant(v) => { - v.insert(new_slot); + v.insert(new_slotdata); + trace!("inserted new slot {:?}", new_slot); } Entry::Occupied(o) => { let v = o.into_mut(); - parent_update = v.parent != new_slot.parent && new_slot.parent.is_some(); - v.parent = v.parent.or(new_slot.parent); + parent_update = v.parent != new_parent && new_parent.is_some(); + if parent_update { + trace!( + "update parent of slot {}: {}->{}", + new_slot, + v.parent.unwrap_or(0), + new_parent.unwrap_or(0) + ); + } + v.parent = v.parent.or(new_parent); // Never decrease the slot status - if v.status == SlotStatus::Processed || new_slot.status == SlotStatus::Rooted { - v.status = new_slot.status; + if v.status == SlotStatus::Processed || new_status == SlotStatus::Rooted { + trace!( + "update status of slot {}: {:?}->{:?}", + new_slot, + v.status, + new_status + ); + v.status = new_status; } } }; if new_best_chain || parent_update { + trace!("update chain data for slot {} and ancestors", new_slot); // update the "chain" field down to the first rooted slot let mut slot = self.best_chain_slot; loop { @@ -143,23 +172,9 @@ impl ChainData { self.account_versions_stored = 0; self.account_bytes_stored = 0; - for (_, writes) in self.accounts.iter_mut() { - let newest_rooted_write_slot = Self::newest_rooted_write( - writes, - self.newest_rooted_slot, - self.best_chain_slot, - &self.slots, - ) - .map(|w| w.slot) - // no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway - .unwrap_or(self.newest_rooted_slot + 1); - writes.retain(|w| { - w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot - }); - self.account_versions_stored += writes.len(); - self.account_bytes_stored += - writes.iter().map(|w| w.account.data().len()).sum::() - } + // TODO improve log + trace!("update account data for slot {}", new_slot); + self.clean_accounts_on_new_root(); // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped @@ -167,6 +182,27 @@ impl ChainData { } } + #[tracing::instrument(skip_all, level = "trace")] + fn clean_accounts_on_new_root(&mut self) { + for (_, writes) in self.accounts.iter_mut() { + let newest_rooted_write_slot = Self::newest_rooted_write( + writes, + self.newest_rooted_slot, + self.best_chain_slot, + &self.slots, + ) + .map(|w| w.slot) + // no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway + .unwrap_or(self.newest_rooted_slot + 1); + writes + .retain(|w| w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot); + self.account_versions_stored += writes.len(); + self.account_bytes_stored += + writes.iter().map(|w| w.account.data().len()).sum::() + } + } + + #[tracing::instrument(skip_all, level = "trace")] pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) { if account.write_version == 0 { // some upstream components provide write_version=0 for snapshot accounts from gMA/gPA @@ -201,7 +237,7 @@ impl ChainData { InsertAfter(pos) => { self.account_versions_stored += 1; self.account_bytes_stored += account.account.data().len(); - v.insert(pos+1, account); + v.insert(pos + 1, account); } DoNothing => {} } @@ -222,6 +258,7 @@ impl ChainData { .unwrap_or(write.slot <= self.newest_rooted_slot || write.slot > self.best_chain_slot) } + // rooted=finalized fn newest_rooted_write<'a>( writes: &'a [AccountData], newest_rooted_slot: u64, @@ -306,6 +343,7 @@ impl ChainData { self.best_chain_slot } + // aka newest finalized pub fn newest_rooted_slot(&self) -> u64 { self.newest_rooted_slot } @@ -416,55 +454,6 @@ mod tests { use solana_sdk::pubkey::Pubkey; use std::str::FromStr; - #[test] - #[ignore] - pub fn test_try_to_reproduce_weird_thing() { - let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap(); - let my_account = Pubkey::new_unique(); - let mut chain_data = ChainData::new(); - - chain_data.update_account( - my_account, - AccountData { - slot: 283573414, - write_version: 1, - account: AccountSharedData::new(100, 100 /*space*/, &owner), - }, - ); - - chain_data.update_slot(SlotData { - slot: 283574172, - parent: None, - status: SlotStatus::Rooted, // =finalized - chain: 0, - }); - - chain_data.update_slot(SlotData { - slot: 283574209, - parent: Some(283574172), - status: SlotStatus::Processed, - chain: 0, - }); - - assert_eq!(chain_data.newest_rooted_slot(), 283574172); - assert_eq!(chain_data.best_chain_slot(), 283574209); - assert_eq!(chain_data.account(&my_account).unwrap().slot, 283573414); - - chain_data.update_account( - my_account, - AccountData { - slot: 283574185, - write_version: 1, - account: AccountSharedData::new(101, 101 /*space*/, &owner), - }, - ); - - assert_eq!(chain_data.newest_rooted_slot(), 283574172); - assert_eq!(chain_data.best_chain_slot(), 283574209); - assert_eq!(chain_data.account(&my_account).unwrap().slot, 283574185); - assert_eq!(chain_data.account(&my_account).unwrap().account.lamports(), 101); - } - #[test] pub fn test_loosing_account_write() { let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap(); @@ -750,4 +739,5 @@ mod tests { }, ] } + }