diff --git a/Cargo.lock b/Cargo.lock index 5022028..3991cac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -797,12 +797,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.91" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd97381a8cc6493395a5afc4c691c1084b3768db713b73aa215217aa245d153" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -825,6 +826,8 @@ dependencies = [ "async-trait", "base64 0.21.7", "bincode", + "clap 3.2.25", + "csv", "futures 0.3.30", "geyser-grpc-connector", "itertools 0.10.5", @@ -1181,9 +1184,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" dependencies = [ "csv-core", "itoa", @@ -2190,6 +2193,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "intmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee87fd093563344074bacf24faa0bb0227fb6969fb223e922db798516de924d6" + [[package]] name = "iovec" version = "0.1.4" @@ -2231,9 +2240,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jobserver" -version = "0.1.28" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -2485,19 +2494,18 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "lz4" -version = "1.26.0" +version = "1.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68" +checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" dependencies = [ - "libc", "lz4-sys", ] [[package]] name = "lz4-sys" -version = "1.10.0" +version = "1.11.1+lz4-1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" dependencies = [ "cc", "libc", @@ -2505,14 +2513,16 @@ dependencies = [ [[package]] name = "mango-feeds-connector" -version = "0.3.0" +version = "0.4.7" dependencies = [ "anyhow", "async-channel", "async-trait", "clap 3.2.25", "criterion", + "csv", "futures 0.3.30", + "intmap", "itertools 0.10.5", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", @@ -4324,6 +4334,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 716d98f..c19bc87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,5 +53,3 @@ rustls = "0.20.8" warp = "0.3" - - diff --git a/chaindata_standalone/.gitignore b/chaindata_standalone/.gitignore new file mode 100644 index 0000000..f76246f --- /dev/null +++ b/chaindata_standalone/.gitignore @@ -0,0 +1 @@ +benchinput/ diff --git a/chaindata_standalone/Cargo.toml b/chaindata_standalone/Cargo.toml index 6d00cc7..5e82244 100644 --- a/chaindata_standalone/Cargo.toml +++ b/chaindata_standalone/Cargo.toml @@ -3,10 +3,6 @@ name = "chaindata_standalone" version = "0.1.0" edition = "2021" -[[bin]] -name = "chaindata_standalone" -path = "src/standalone.rs" - [[bin]] name = "replay_slot_account_stream" path = "src/replay_slot_account_stream.rs" @@ -53,4 +49,8 @@ solana-sdk = { workspace = true } solana-client = { workspace = true } solana-rpc-client-api = "1.17" solana-account-decoder = "1.17" +clap = { version = "3.2.25", features = ["derive"] } +[lints.clippy] +needless_return = "allow" +enum_glob_use = "deny" diff --git a/chaindata_standalone/src/account_write.rs b/chaindata_standalone/src/account_write.rs index 6b6deee..9295a5a 100644 --- a/chaindata_standalone/src/account_write.rs +++ b/chaindata_standalone/src/account_write.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use serde_derive::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; diff --git a/chaindata_standalone/src/get_program_account.rs b/chaindata_standalone/src/get_program_account.rs index 0e90366..41db1a8 100644 --- a/chaindata_standalone/src/get_program_account.rs +++ b/chaindata_standalone/src/get_program_account.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::collections::HashSet; use std::{str::FromStr, sync::Arc, time::Duration}; diff --git a/chaindata_standalone/src/metrics.rs b/chaindata_standalone/src/metrics.rs index 9bbfaf3..7833e6c 100644 --- a/chaindata_standalone/src/metrics.rs +++ b/chaindata_standalone/src/metrics.rs @@ -1,4 +1,4 @@ -use prometheus::{IntCounter, register_int_counter}; +use prometheus::{register_int_counter, IntCounter}; lazy_static::lazy_static! { pub static ref SNAPSHOT_ACCOUNTS_CNT: IntCounter = register_int_counter!("SNAPSHOT_ACCOUNTS_CNT", "Counter").unwrap(); @@ -16,4 +16,4 @@ lazy_static::lazy_static! { register_int_counter!("CHAINDATA_SNAP_UPDATE_ACCOUNT", "Counter").unwrap(); pub static ref ACCOUNT_UPDATE_SENDER: IntCounter = register_int_counter!("ACCOUNT_UPDATE_SENDER", "Counter").unwrap(); -} \ No newline at end of file +} diff --git a/chaindata_standalone/src/metrics_dump.rs b/chaindata_standalone/src/metrics_dump.rs index 9ca6438..92deb09 100644 --- a/chaindata_standalone/src/metrics_dump.rs +++ b/chaindata_standalone/src/metrics_dump.rs @@ -1,16 +1,19 @@ -use std::f64::NAN; -use std::time::Duration; use log::{debug, info, trace}; use prometheus::core::{Collector, Metric}; use prometheus::IntCounter; -use tokio::time::{Instant, Interval, interval, sleep}; +use std::f64::NAN; +use std::time::Duration; +use tokio::time::{interval, sleep, Instant, Interval}; pub fn start_metrics_dumper(prom_counter: &IntCounter) { let metric = prom_counter.clone(); - let name = prom_counter.desc().get(0).map(|x| x.fq_name.clone()).unwrap_or("noname".to_string()); + let name = prom_counter + .desc() + .get(0) + .map(|x| x.fq_name.clone()) + .unwrap_or("noname".to_string()); tokio::spawn(async move { - let mut interval = interval(Duration::from_millis(3000)); let mut last_observed_at = Instant::now(); @@ -23,7 +26,12 @@ pub fn start_metrics_dumper(prom_counter: &IntCounter) { if last_observed_value != u64::MIN { let elapsed = last_observed_at.elapsed().as_secs_f64(); let delta = value - last_observed_value; - debug!("counter <{}> (value={}) with throughput {:.1}/s", name, value, delta as f64 / elapsed); + debug!( + "counter <{}> (value={}) with throughput {:.1}/s", + name, + value, + delta as f64 / elapsed + ); } last_observed_value = value; @@ -31,7 +39,5 @@ pub fn start_metrics_dumper(prom_counter: &IntCounter) { interval.tick().await; } - - }); -} \ No newline at end of file +} diff --git a/chaindata_standalone/src/replay_slot_account_stream.rs b/chaindata_standalone/src/replay_slot_account_stream.rs index abd72da..877ac63 100644 --- a/chaindata_standalone/src/replay_slot_account_stream.rs +++ b/chaindata_standalone/src/replay_slot_account_stream.rs @@ -1,21 +1,26 @@ -use std::fs::File; -use std::io; -use std::io::BufRead; -use std::path::PathBuf; -use std::str::FromStr; -use std::time::Instant; -use csv::ReaderBuilder; use itertools::Itertools; use log::{info, trace}; +use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus}; use solana_sdk::account::AccountSharedData; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentLevel; use solana_sdk::pubkey::Pubkey; -use tracing_subscriber::EnvFilter; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::str::FromStr; +use std::time::Instant; use tracing_subscriber::fmt::format::FmtSpan; -use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus}; +use tracing_subscriber::EnvFilter; + +// #[derive(Parser, Debug, Clone)] +// #[clap()] +// struct Cli { +// #[clap(short, long)] +// replay_file: Option, +// } -const RAYDIUM_AMM_PUBKEY: &'static str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; +const RAYDIUM_AMM_PUBKEY: &str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; pub fn main() { tracing_subscriber::fmt() @@ -23,26 +28,47 @@ pub fn main() { .with_span_events(FmtSpan::CLOSE) .init(); - // long file with 5032825 entries - let _slot_stream_dump_file = PathBuf::from_str("/Users/stefan/mango/projects/mango-feeds-connector/dump-slot-acccounts-fsn4-mixed.csv").unwrap(); - // 500k - let slot_stream_dump_file = PathBuf::from_str("/Users/stefan/mango/projects/mango-feeds-connector/dump-slot-acccounts-fsn4-mixed-500k.csv").unwrap(); + // let Cli { replay_file } = Cli::parse(); let mut chain_data = ChainData::new(); let mut slot_cnt = 0; let mut account_cnt = 0; let started_at = Instant::now(); - // read lins - let file = File::open(slot_stream_dump_file).unwrap(); - let lines = io::BufReader::new(file).lines(); - for line in lines.flatten() { + // read line + // let buffer: BufReader<_> = match replay_file { + // None => { + // info!("use data from inline csv"); + // // 500k + // let data = include_bytes!("dump-slot-acccounts-fsn4-mixed-500k.csv"); + // io::BufReader::new(data.as_ref()) + // } + // Some(slot_stream_dump_file) => { + // info!("use replay_file: {}", slot_stream_dump_file); + // let file = File::open(slot_stream_dump_file).unwrap(); + // io::BufReader::new(file) + // } + // }; + + let buffer = { + info!("use data from inline csv"); + // 500k + let slot_stream_dump_file = + "chaindata_standalone/benchinput/dump-slot-acccounts-fsn4-mixed-500k.csv"; + let file = File::open(slot_stream_dump_file).unwrap(); + io::BufReader::new(file) + }; + for line in buffer.lines().map_while(Result::ok) { // update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms // slot, account_pk, write_version, data_len, since_epoch_ms - let rows = line.split(",").collect_vec(); + let rows = line.split(',').collect_vec(); if rows[0] == "MIXSLOT" { let slot: u64 = rows[1].parse().unwrap(); - let parent: Option = rows[2].parse().ok().and_then(|v| if v == 0 { None } else { Some(v) }); + let parent: Option = + rows[2] + .parse() + .ok() + .and_then(|v| if v == 0 { None } else { Some(v) }); let commitment_level = match rows[3].to_string().as_str() { "P" => CommitmentLevel::Processed, "C" => CommitmentLevel::Confirmed, @@ -58,7 +84,13 @@ pub fn main() { _ => panic!("invalid commitment level"), }; const INIT_CHAIN: Slot = 0; - trace!("MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}", slot, parent, slot_status, since_epoch_ms); + trace!( + "MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}", + slot, + parent, + slot_status, + since_epoch_ms + ); let slot_data = SlotData { slot, parent, @@ -67,7 +99,6 @@ pub fn main() { }; slot_cnt += 1; chain_data.update_slot(slot_data); - } else if rows[0] == "MIXACCOUNT" { let slot: u64 = rows[1].parse().unwrap(); let account_pk: String = rows[2].parse().unwrap(); @@ -80,7 +111,11 @@ pub fn main() { let account_data = AccountData { slot, write_version, - account: AccountSharedData::new(slot, data_len as usize, &Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap()) + account: AccountSharedData::new( + slot, + data_len as usize, + &Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap(), + ), }; account_cnt += 1; @@ -88,10 +123,19 @@ pub fn main() { } if (slot_cnt + account_cnt) % 100_000 == 0 { - info!("progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", slot_cnt, account_cnt, started_at.elapsed().as_secs_f64()); + info!( + "progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", + slot_cnt, + account_cnt, + started_at.elapsed().as_secs_f64() + ); } } - info!("slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", slot_cnt, account_cnt, started_at.elapsed().as_secs_f64()); - + info!( + "slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", + slot_cnt, + account_cnt, + started_at.elapsed().as_secs_f64() + ); } diff --git a/connector/Cargo.toml b/connector/Cargo.toml index 0d8c2b0..fe4e26d 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mango-feeds-connector" -version = "0.3.0" -authors = ["Christian Kamm "] +version = "0.4.7" +authors = ["Christian Kamm ", "groovie@mango.markets"] edition = "2021" license = "AGPL-3.0-or-later" description = "Listen to Solana account updates via geyser or websockets" diff --git a/connector/src/chain_data.rs b/connector/src/chain_data.rs index 402b8ba..1a4ead7 100644 --- a/connector/src/chain_data.rs +++ b/connector/src/chain_data.rs @@ -1,6 +1,6 @@ use intmap::IntMap; use crate::chain_data::SlotVectorEffect::*; -use log::{info, trace}; +use log::trace; use smallvec::{smallvec, SmallVec}; use solana_sdk::clock::Slot; use warp::trace; @@ -14,6 +14,7 @@ use crate::metrics::*; #[derive(Clone, Copy, Debug, PartialEq)] pub enum SlotStatus { + // aka Finalized Rooted, Confirmed, Processed, @@ -122,12 +123,18 @@ impl ChainData { let v = o.into_mut(); parent_update = v.parent != new_parent && new_parent.is_some(); if parent_update { - trace!("update parent of slot {}: {}->{}", new_slot, v.parent.unwrap_or(0), new_parent.unwrap_or(0)); + trace!("update parent of slot {}: {}->{}", + new_slot, v.parent.unwrap_or(0), new_parent.unwrap_or(0)); } v.parent = v.parent.or(new_parent); // Never decrease the slot status if v.status == SlotStatus::Processed || new_status == SlotStatus::Rooted { - trace!("update status of slot {}: {:?}->{:?}", new_slot, v.status, new_status); + trace!( + "update status of slot {}: {:?}->{:?}", + new_slot, + v.status, + new_status + ); v.status = new_status; } } @@ -224,7 +231,7 @@ impl ChainData { InsertAfter(pos) => { self.account_versions_stored += 1; self.account_bytes_stored += account.account.data().len(); - v.insert(pos, account); + v.insert(pos + 1, account); } DoNothing => {} } @@ -245,6 +252,7 @@ impl ChainData { .unwrap_or(write.slot <= self.newest_rooted_slot || write.slot > self.best_chain_slot) } + // rooted=finalized fn newest_rooted_write<'a>( writes: &'a [AccountData], newest_rooted_slot: u64, @@ -329,6 +337,7 @@ impl ChainData { self.best_chain_slot } + // aka newest finalized pub fn newest_rooted_slot(&self) -> u64 { self.newest_rooted_slot } @@ -439,6 +448,60 @@ mod tests { use csv::ReaderBuilder; use solana_sdk::commitment_config::CommitmentLevel; + #[test] + pub fn test_loosing_account_write() { + let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap(); + let my_account = Pubkey::new_unique(); + let mut chain_data = ChainData::new(); + + chain_data.update_account( + my_account, + AccountData { + slot: 123, + write_version: 1, + account: AccountSharedData::new(100, 100 /*space*/, &owner), + }, + ); + + chain_data.update_slot(SlotData { + slot: 123, + parent: None, + status: SlotStatus::Rooted, // =finalized + chain: 0, + }); + + chain_data.update_account( + my_account, + AccountData { + slot: 128, + write_version: 1, + account: AccountSharedData::new(101, 101 /*space*/, &owner), + }, + ); + + chain_data.update_slot(SlotData { + slot: 128, + parent: Some(123), + status: SlotStatus::Processed, + chain: 0, + }); + + assert_eq!(chain_data.newest_rooted_slot(), 123); + assert_eq!(chain_data.best_chain_slot(), 128); + assert_eq!(chain_data.account(&my_account).unwrap().slot, 128); + + chain_data.update_slot(SlotData { + slot: 129, + parent: Some(128), + status: SlotStatus::Processed, + chain: 0, + }); + + assert_eq!(chain_data.newest_rooted_slot(), 123); + assert_eq!(chain_data.best_chain_slot(), 129); + assert_eq!(chain_data.account(&my_account).unwrap().slot, 128); + } + #[test] pub fn test_move_slot_to_finalized() { const SLOT: Slot = 42_000_000;