Skip to content

Commit

Permalink
Merge branch 'main' into hacking/slots-map-using-intmap
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 17, 2024
2 parents 615175d + 24d5d30 commit 2043236
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 61 deletions.
40 changes: 28 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,3 @@ rustls = "0.20.8"

warp = "0.3"



1 change: 1 addition & 0 deletions chaindata_standalone/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
benchinput/
8 changes: 4 additions & 4 deletions chaindata_standalone/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ name = "chaindata_standalone"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "chaindata_standalone"
path = "src/standalone.rs"

[[bin]]
name = "replay_slot_account_stream"
path = "src/replay_slot_account_stream.rs"
Expand Down Expand Up @@ -53,4 +49,8 @@ solana-sdk = { workspace = true }
solana-client = { workspace = true }
solana-rpc-client-api = "1.17"
solana-account-decoder = "1.17"
clap = { version = "3.2.25", features = ["derive"] }

[lints.clippy]
needless_return = "allow"
enum_glob_use = "deny"
1 change: 1 addition & 0 deletions chaindata_standalone/src/account_write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(dead_code)]
use serde_derive::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
Expand Down
2 changes: 2 additions & 0 deletions chaindata_standalone/src/get_program_account.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use std::collections::HashSet;
use std::{str::FromStr, sync::Arc, time::Duration};

Expand Down
4 changes: 2 additions & 2 deletions chaindata_standalone/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use prometheus::{IntCounter, register_int_counter};
use prometheus::{register_int_counter, IntCounter};
lazy_static::lazy_static! {
pub static ref SNAPSHOT_ACCOUNTS_CNT: IntCounter =
register_int_counter!("SNAPSHOT_ACCOUNTS_CNT", "Counter").unwrap();
Expand All @@ -16,4 +16,4 @@ lazy_static::lazy_static! {
register_int_counter!("CHAINDATA_SNAP_UPDATE_ACCOUNT", "Counter").unwrap();
pub static ref ACCOUNT_UPDATE_SENDER: IntCounter =
register_int_counter!("ACCOUNT_UPDATE_SENDER", "Counter").unwrap();
}
}
24 changes: 15 additions & 9 deletions chaindata_standalone/src/metrics_dump.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::f64::NAN;
use std::time::Duration;
use log::{debug, info, trace};
use prometheus::core::{Collector, Metric};
use prometheus::IntCounter;
use tokio::time::{Instant, Interval, interval, sleep};
use std::f64::NAN;
use std::time::Duration;
use tokio::time::{interval, sleep, Instant, Interval};

pub fn start_metrics_dumper(prom_counter: &IntCounter) {
let metric = prom_counter.clone();
let name = prom_counter.desc().get(0).map(|x| x.fq_name.clone()).unwrap_or("noname".to_string());
let name = prom_counter
.desc()
.get(0)
.map(|x| x.fq_name.clone())
.unwrap_or("noname".to_string());

tokio::spawn(async move {

let mut interval = interval(Duration::from_millis(3000));

let mut last_observed_at = Instant::now();
Expand All @@ -23,15 +26,18 @@ pub fn start_metrics_dumper(prom_counter: &IntCounter) {
if last_observed_value != u64::MIN {
let elapsed = last_observed_at.elapsed().as_secs_f64();
let delta = value - last_observed_value;
debug!("counter <{}> (value={}) with throughput {:.1}/s", name, value, delta as f64 / elapsed);
debug!(
"counter <{}> (value={}) with throughput {:.1}/s",
name,
value,
delta as f64 / elapsed
);
}

last_observed_value = value;
last_observed_at = Instant::now();

interval.tick().await;
}


});
}
}
96 changes: 70 additions & 26 deletions chaindata_standalone/src/replay_slot_account_stream.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,74 @@
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Instant;
use csv::ReaderBuilder;
use itertools::Itertools;
use log::{info, trace};
use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus};
use solana_sdk::account::AccountSharedData;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_sdk::pubkey::Pubkey;
use tracing_subscriber::EnvFilter;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::str::FromStr;
use std::time::Instant;
use tracing_subscriber::fmt::format::FmtSpan;
use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus};
use tracing_subscriber::EnvFilter;

// #[derive(Parser, Debug, Clone)]
// #[clap()]
// struct Cli {
// #[clap(short, long)]
// replay_file: Option<String>,
// }

const RAYDIUM_AMM_PUBKEY: &'static str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";
const RAYDIUM_AMM_PUBKEY: &str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";

pub fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_span_events(FmtSpan::CLOSE)
.init();

// long file with 5032825 entries
let _slot_stream_dump_file = PathBuf::from_str("/Users/stefan/mango/projects/mango-feeds-connector/dump-slot-acccounts-fsn4-mixed.csv").unwrap();
// 500k
let slot_stream_dump_file = PathBuf::from_str("/Users/stefan/mango/projects/mango-feeds-connector/dump-slot-acccounts-fsn4-mixed-500k.csv").unwrap();
// let Cli { replay_file } = Cli::parse();

let mut chain_data = ChainData::new();
let mut slot_cnt = 0;
let mut account_cnt = 0;
let started_at = Instant::now();
// read lins
let file = File::open(slot_stream_dump_file).unwrap();
let lines = io::BufReader::new(file).lines();
for line in lines.flatten() {
// read line
// let buffer: BufReader<_> = match replay_file {
// None => {
// info!("use data from inline csv");
// // 500k
// let data = include_bytes!("dump-slot-acccounts-fsn4-mixed-500k.csv");
// io::BufReader::new(data.as_ref())
// }
// Some(slot_stream_dump_file) => {
// info!("use replay_file: {}", slot_stream_dump_file);
// let file = File::open(slot_stream_dump_file).unwrap();
// io::BufReader::new(file)
// }
// };

let buffer = {
info!("use data from inline csv");
// 500k
let slot_stream_dump_file =
"chaindata_standalone/benchinput/dump-slot-acccounts-fsn4-mixed-500k.csv";
let file = File::open(slot_stream_dump_file).unwrap();
io::BufReader::new(file)
};
for line in buffer.lines().map_while(Result::ok) {
// update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms
// slot, account_pk, write_version, data_len, since_epoch_ms
let rows = line.split(",").collect_vec();
let rows = line.split(',').collect_vec();

if rows[0] == "MIXSLOT" {
let slot: u64 = rows[1].parse().unwrap();
let parent: Option<u64> = rows[2].parse().ok().and_then(|v| if v == 0 { None } else { Some(v) });
let parent: Option<u64> =
rows[2]
.parse()
.ok()
.and_then(|v| if v == 0 { None } else { Some(v) });
let commitment_level = match rows[3].to_string().as_str() {
"P" => CommitmentLevel::Processed,
"C" => CommitmentLevel::Confirmed,
Expand All @@ -58,7 +84,13 @@ pub fn main() {
_ => panic!("invalid commitment level"),
};
const INIT_CHAIN: Slot = 0;
trace!("MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}", slot, parent, slot_status, since_epoch_ms);
trace!(
"MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}",
slot,
parent,
slot_status,
since_epoch_ms
);
let slot_data = SlotData {
slot,
parent,
Expand All @@ -67,7 +99,6 @@ pub fn main() {
};
slot_cnt += 1;
chain_data.update_slot(slot_data);

} else if rows[0] == "MIXACCOUNT" {
let slot: u64 = rows[1].parse().unwrap();
let account_pk: String = rows[2].parse().unwrap();
Expand All @@ -80,18 +111,31 @@ pub fn main() {
let account_data = AccountData {
slot,
write_version,
account: AccountSharedData::new(slot, data_len as usize, &Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap())
account: AccountSharedData::new(
slot,
data_len as usize,
&Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap(),
),
};

account_cnt += 1;
chain_data.update_account(account_pk, account_data);
}

if (slot_cnt + account_cnt) % 100_000 == 0 {
info!("progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", slot_cnt, account_cnt, started_at.elapsed().as_secs_f64());
info!(
"progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s",
slot_cnt,
account_cnt,
started_at.elapsed().as_secs_f64()
);
}
}

info!("slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", slot_cnt, account_cnt, started_at.elapsed().as_secs_f64());

info!(
"slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s",
slot_cnt,
account_cnt,
started_at.elapsed().as_secs_f64()
);
}
4 changes: 2 additions & 2 deletions connector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "mango-feeds-connector"
version = "0.3.0"
authors = ["Christian Kamm <[email protected]>"]
version = "0.4.7"
authors = ["Christian Kamm <[email protected]>", "[email protected]"]
edition = "2021"
license = "AGPL-3.0-or-later"
description = "Listen to Solana account updates via geyser or websockets"
Expand Down
Loading

0 comments on commit 2043236

Please sign in to comment.