Skip to content

Commit

Permalink
ChainData: update account InsertAfter bug
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser authored and grooviegermanikus committed Aug 15, 2024
1 parent 98f05ab commit 015d61d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ serde = { workspace = true }
serde_derive = { workspace = true }

log = { workspace = true }
tracing = "0.1.40"
anyhow = { workspace = true }
smallvec = "1.13.2"

Expand Down
152 changes: 71 additions & 81 deletions connector/src/chain_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::chain_data::SlotVectorEffect::*;
use log::{info, trace};
use log::trace;
use smallvec::{smallvec, SmallVec};
use solana_sdk::clock::Slot;
use {
Expand All @@ -12,6 +12,7 @@ use crate::metrics::*;

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
// aka Finalized
Rooted,
Confirmed,
Processed,
Expand Down Expand Up @@ -82,44 +83,72 @@ impl Default for ChainData {
}

impl ChainData {
pub fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
#[tracing::instrument(skip_all, level = "trace")]
pub fn update_slot(&mut self, new_slotdata: SlotData) {
let SlotData {
slot: new_slot,
parent: new_parent,
status: new_status,
..
} = new_slotdata;

trace!("update_slot from newslot {:?}", new_slot);
let new_processed_head = new_slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
self.newest_processed_slot = new_slot;
trace!("use slot {} as newest_processed_slot", new_slot);
}

let new_rooted_head =
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
new_slot > self.newest_rooted_slot && new_status == SlotStatus::Rooted;
if new_rooted_head {
self.newest_rooted_slot = new_slot.slot;
self.newest_rooted_slot = new_slot;
trace!("use slot {} as newest_rooted_slot", new_slot);
}

// Use the highest slot that has a known parent as best chain
// (sometimes slots OptimisticallyConfirm before we even know the parent!)
let new_best_chain = new_slot.parent.is_some() && new_slot.slot > self.best_chain_slot;
let new_best_chain = new_parent.is_some() && new_slot > self.best_chain_slot;
if new_best_chain {
self.best_chain_slot = new_slot.slot;
self.best_chain_slot = new_slot;
trace!("use slot {} as best_chain_slot", new_slot);
}

let mut parent_update = false;

use std::collections::hash_map::Entry;
match self.slots.entry(new_slot.slot) {
match self.slots.entry(new_slot) {
Entry::Vacant(v) => {
v.insert(new_slot);
v.insert(new_slotdata);
trace!("inserted new slot {:?}", new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
v.parent = v.parent.or(new_slot.parent);
parent_update = v.parent != new_parent && new_parent.is_some();
if parent_update {
trace!(
"update parent of slot {}: {}->{}",
new_slot,
v.parent.unwrap_or(0),
new_parent.unwrap_or(0)
);
}
v.parent = v.parent.or(new_parent);
// Never decrease the slot status
if v.status == SlotStatus::Processed || new_slot.status == SlotStatus::Rooted {
v.status = new_slot.status;
if v.status == SlotStatus::Processed || new_status == SlotStatus::Rooted {
trace!(
"update status of slot {}: {:?}->{:?}",
new_slot,
v.status,
new_status
);
v.status = new_status;
}
}
};

if new_best_chain || parent_update {
trace!("update chain data for slot {} and ancestors", new_slot);
// update the "chain" field down to the first rooted slot
let mut slot = self.best_chain_slot;
loop {
Expand All @@ -143,30 +172,37 @@ impl ChainData {
self.account_versions_stored = 0;
self.account_bytes_stored = 0;

for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write_slot = Self::newest_rooted_write(
writes,
self.newest_rooted_slot,
self.best_chain_slot,
&self.slots,
)
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes.retain(|w| {
w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot
});
self.account_versions_stored += writes.len();
self.account_bytes_stored +=
writes.iter().map(|w| w.account.data().len()).sum::<usize>()
}
// TODO improve log
trace!("update account data for slot {}", new_slot);
self.clean_accounts_on_new_root();

// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
}
}

#[tracing::instrument(skip_all, level = "trace")]
fn clean_accounts_on_new_root(&mut self) {
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write_slot = Self::newest_rooted_write(
writes,
self.newest_rooted_slot,
self.best_chain_slot,
&self.slots,
)
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored +=
writes.iter().map(|w| w.account.data().len()).sum::<usize>()
}
}

#[tracing::instrument(skip_all, level = "trace")]
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
if account.write_version == 0 {
// some upstream components provide write_version=0 for snapshot accounts from gMA/gPA
Expand Down Expand Up @@ -201,7 +237,7 @@ impl ChainData {
InsertAfter(pos) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos+1, account);
v.insert(pos + 1, account);
}
DoNothing => {}
}
Expand All @@ -222,6 +258,7 @@ impl ChainData {
.unwrap_or(write.slot <= self.newest_rooted_slot || write.slot > self.best_chain_slot)
}

// rooted=finalized
fn newest_rooted_write<'a>(
writes: &'a [AccountData],
newest_rooted_slot: u64,
Expand Down Expand Up @@ -306,6 +343,7 @@ impl ChainData {
self.best_chain_slot
}

// aka newest finalized
pub fn newest_rooted_slot(&self) -> u64 {
self.newest_rooted_slot
}
Expand Down Expand Up @@ -416,55 +454,6 @@ mod tests {
use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;

#[test]
#[ignore]
pub fn test_try_to_reproduce_weird_thing() {
let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap();
let my_account = Pubkey::new_unique();
let mut chain_data = ChainData::new();

chain_data.update_account(
my_account,
AccountData {
slot: 283573414,
write_version: 1,
account: AccountSharedData::new(100, 100 /*space*/, &owner),
},
);

chain_data.update_slot(SlotData {
slot: 283574172,
parent: None,
status: SlotStatus::Rooted, // =finalized
chain: 0,
});

chain_data.update_slot(SlotData {
slot: 283574209,
parent: Some(283574172),
status: SlotStatus::Processed,
chain: 0,
});

assert_eq!(chain_data.newest_rooted_slot(), 283574172);
assert_eq!(chain_data.best_chain_slot(), 283574209);
assert_eq!(chain_data.account(&my_account).unwrap().slot, 283573414);

chain_data.update_account(
my_account,
AccountData {
slot: 283574185,
write_version: 1,
account: AccountSharedData::new(101, 101 /*space*/, &owner),
},
);

assert_eq!(chain_data.newest_rooted_slot(), 283574172);
assert_eq!(chain_data.best_chain_slot(), 283574209);
assert_eq!(chain_data.account(&my_account).unwrap().slot, 283574185);
assert_eq!(chain_data.account(&my_account).unwrap().account.lamports(), 101);
}

#[test]
pub fn test_loosing_account_write() {
let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap();
Expand Down Expand Up @@ -750,4 +739,5 @@ mod tests {
},

Check warning on line 739 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
]
}

}

0 comments on commit 015d61d

Please sign in to comment.