Skip to content

Commit

Permalink
Man0s/crossbar legacy indexer (#245)
Browse files Browse the repository at this point in the history
* cli: lut check cmd

* indexer: swb pull support

* fix: exclude indexer from workspace
  • Loading branch information
losman0s authored Sep 6, 2024
1 parent 24dfeb1 commit d33e649
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 40 deletions.
12 changes: 12 additions & 0 deletions clients/rust/marginfi-cli/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ pub enum GroupCommand {
#[clap(short = 't', long)]
existing_token_lookup_tables: Vec<Pubkey>,
},
CheckLookupTable {
#[clap(short = 't', long)]
existing_token_lookup_tables: Vec<Pubkey>,
},
}

#[derive(Clone, Copy, Debug, Parser, ArgEnum)]
Expand Down Expand Up @@ -606,6 +610,14 @@ fn group(subcmd: GroupCommand, global_options: &GlobalOptions) -> Result<()> {
processor::handle_bankruptcy_for_accounts(&config, &profile, accounts)
}

GroupCommand::CheckLookupTable {
existing_token_lookup_tables,
} => processor::group::process_check_lookup_tables(
&config,
&profile,
existing_token_lookup_tables,
),

GroupCommand::UpdateLookupTable {
existing_token_lookup_tables,
} => processor::group::process_update_lookup_tables(
Expand Down
121 changes: 121 additions & 0 deletions clients/rust/marginfi-cli/src/processor/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,127 @@ use std::mem::size_of;
const CHUNK_SIZE: usize = 22;
const KEY_BATCH_SIZE: usize = 20;

pub fn process_check_lookup_tables(
config: &Config,
profile: &Profile,
existing_lookup_tables: Vec<Pubkey>,
) -> Result<()> {
let rpc = config.mfi_program.rpc();
let marginfi_group = profile.marginfi_group.expect("group not set");

let mut accounts: Vec<Account> = vec![];

for chunk in existing_lookup_tables.chunks(CHUNK_SIZE) {
let accounts_2: Vec<Account> = rpc
.get_multiple_accounts(chunk)?
.into_iter()
.flatten()
.collect();

accounts.extend(accounts_2);
}

let lookup_tables: Vec<AddressLookupTable> = accounts
.iter_mut()
.zip(existing_lookup_tables.iter())
.map(|(account, address)| {
let lookup_table = AddressLookupTable::deserialize(&account.data).unwrap();
println!(
"Loaded table {} with {} addresses",
address,
lookup_table.addresses.len()
);

if lookup_table.meta.authority != Some(config.authority()) {
println!(
"Lookup table {} has wrong authority {:?}",
address, lookup_table.meta.authority,
);
}

lookup_table
})
.collect();

let banks = config
.mfi_program
.accounts::<Bank>(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
8 + size_of::<Pubkey>() + size_of::<u8>(),
marginfi_group.to_bytes().to_vec(),
))])?;

let _bank_pks = banks.iter().map(|(pk, _)| *pk).collect::<Vec<Pubkey>>();

let oracle_pks = banks
.iter()
.flat_map(|(_, bank)| bank.config.oracle_keys)
.filter(|pk| pk != &Pubkey::default())
.collect::<Vec<Pubkey>>();

// Dedup the oracle pks.
let _oracle_pks = oracle_pks
.into_iter()
.fold(vec![], |mut acc, pk| {
if !acc.contains(&pk) {
acc.push(pk);
}
acc
})
.into_iter()
.collect::<Vec<Pubkey>>();

// Join keys
let mut keys = vec![
config.mfi_program.id(),
marginfi_group,
spl_token::id(),
system_program::id(),
];

for (bank_pk, bank) in banks.iter() {
keys.push(*bank_pk);
keys.push(bank.liquidity_vault);
let (vault_auth, _) = utils::find_bank_vault_authority_pda(
bank_pk,
marginfi::state::marginfi_group::BankVaultType::Liquidity,
&marginfi::ID,
);

keys.push(vault_auth);

keys.extend_from_slice(
&bank
.config
.oracle_keys
.iter()
.filter(|pk| **pk != Pubkey::default())
.cloned()
.collect::<Vec<_>>(),
);
}

keys.dedup();

// Find missing keys in lookup tables
let missing_keys = keys
.iter()
.filter(|pk| {
let missing = !lookup_tables
.iter()
.any(|lookup_table| lookup_table.addresses.iter().any(|address| &address == pk));

println!("Key {} missing: {}", pk, missing);

missing
})
.cloned()
.collect::<Vec<Pubkey>>();

println!("Missing {} keys", missing_keys.len());

Ok(())
}

pub fn process_update_lookup_tables(
config: &Config,
profile: &Profile,
Expand Down
16 changes: 12 additions & 4 deletions observability/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,22 @@ marginfi = { path = "../../programs/marginfi", features = [
] }

gcp-bigquery-client = "0.16.7"
google-cloud-default = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = ["pubsub"] }
google-cloud-default = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = [
"pubsub",
] }
google-cloud-auth = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" }
google-cloud-pubsub = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" }
google-cloud-gax = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" }
google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = ["bytes", "pubsub"] }
google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = [
"bytes",
"pubsub",
] }
yup-oauth2 = "8.3.0"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "a2cd1498ac64baa1017d4a4cdefbf46100215b4c" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "a2cd1498ac64baa1017d4a4cdefbf46100215b4c" }
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" }
switchboard-on-demand-client = "0.1.7"
switchboard-on-demand = "0.1.7"
hex = "0.4.3"
fixed = "1.12.0"
fixed-macro = "1.2.0"
dotenv = "0.15.0"
Expand Down
12 changes: 7 additions & 5 deletions observability/indexer/src/commands/index_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ pub async fn index_accounts(config: IndexAccountsConfig) -> Result<()> {
async fn listen_to_updates(ctx: Arc<Context>) {
loop {
info!("Connecting geyser client");
let geyser_client_connection_result = GeyserGrpcClient::connect(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
None,
);
let geyser_client_connection_result =
GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string())
.unwrap()
.x_token(Some(ctx.config.rpc_token.to_string()))
.unwrap()
.connect()
.await;

let mut geyser_client = match geyser_client_connection_result {
Ok(geyser_client) => geyser_client,
Expand Down
17 changes: 7 additions & 10 deletions observability/indexer/src/commands/index_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,13 @@ pub async fn index_transactions(config: IndexTransactionsConfig) -> Result<()> {
async fn listen_to_updates(ctx: Arc<Context>) {
loop {
info!("Connecting geyser client");
let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
false,
)
.await;
info!("Connected");
let geyser_client_connection_result =
GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string())
.unwrap()
.x_token(Some(ctx.config.rpc_token.to_string()))
.unwrap()
.connect()
.await;

let mut geyser_client = match geyser_client_connection_result {
Ok(geyser_client) => geyser_client,
Expand Down
67 changes: 56 additions & 11 deletions observability/indexer/src/commands/snapshot_accounts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::utils::convert_account;
use crate::utils::crossbar::{CrossbarCache, SwbPullFeedMeta};
use crate::utils::metrics::{LendingPoolBankMetrics, MarginfiAccountMetrics, MarginfiGroupMetrics};
use crate::utils::snapshot::Snapshot;
use crate::utils::snapshot::{AccountRoutingType, BankUpdateRoutingType};
use crate::utils::snapshot::{OracleData, Snapshot};
use crate::utils::swb_pull::overwrite_price_from_sim;
use anyhow::Result;
use chrono::{DateTime, Utc};
use envconfig::Envconfig;
Expand Down Expand Up @@ -112,14 +114,16 @@ pub struct Context {
account_updates_queue: Arc<Mutex<BTreeMap<u64, HashMap<Pubkey, AccountUpdate>>>>,
latest_slots_with_commitment: Arc<Mutex<BTreeSet<u64>>>,
account_snapshot: Arc<Mutex<Snapshot>>,
crossbar_store: Arc<CrossbarCache>,
stream_disconnection_count: Arc<AtomicU64>,
update_processing_error_count: Arc<AtomicU64>,
}

impl Context {
pub async fn new(config: &SnapshotAccountsConfig) -> Self {
let rpc_endpoint = format!("{}/{}", config.rpc_endpoint, config.rpc_token);
let rpc_client = Arc::new(RpcClient::new_with_commitment(
format!("{}/{}", config.rpc_endpoint, config.rpc_token),
rpc_endpoint,
CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Finalized,
},
Expand All @@ -132,6 +136,7 @@ impl Context {
account_updates_queue: Arc::new(Mutex::new(BTreeMap::new())),
latest_slots_with_commitment: Arc::new(Mutex::new(BTreeSet::new())),
account_snapshot: Arc::new(Mutex::new(Snapshot::new(config.program_id, rpc_client))),
crossbar_store: Arc::new(CrossbarCache::new()),
stream_disconnection_count: Arc::new(AtomicU64::new(0)),
update_processing_error_count: Arc::new(AtomicU64::new(0)),
}
Expand Down Expand Up @@ -188,6 +193,26 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
snapshot.init().await.unwrap();
info!("Summary: {snapshot}");

let swb_feed_accounts_and_hashes = snapshot
.price_feeds
.iter()
.filter_map(|(pk, od)| match od {
OracleData::SwitchboardPull(feed) => Some((*pk, hex::encode(feed.feed.feed_hash))),
_ => None,
})
.collect::<Vec<_>>();

context.crossbar_store.track_feeds(
swb_feed_accounts_and_hashes
.into_iter()
.map(|(feed_address, feed_hash)| SwbPullFeedMeta {
feed_hash,
feed_address,
})
.collect::<Vec<_>>(),
);
context.crossbar_store.refresh_prices().await;

snapshot
.routing_lookup
.iter()
Expand All @@ -207,6 +232,26 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
let geyser_subscription_config = compute_geyser_config(&config, &non_program_accounts).await;
*context.geyser_subscription_config.lock().await = (false, geyser_subscription_config.clone());

let update_crossbar_cache_handle = tokio::spawn({
let context = context.clone();
async move {
loop {
context.crossbar_store.refresh_prices().await;
let mut snapshot = context.account_snapshot.lock().await;
let feeds_per_address: HashMap<Pubkey, crate::utils::crossbar::SimulatedPrice> =
context.crossbar_store.get_prices_per_address();
for (address, price) in feeds_per_address {
if let Some(od) = snapshot.price_feeds.get_mut(&address) {
if let OracleData::SwitchboardPull(feed) = od {
overwrite_price_from_sim(feed, &price);
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
}
}
});

let listen_to_updates_handle = tokio::spawn({
let context = context.clone();
async move { listen_to_updates(context).await }
Expand All @@ -226,6 +271,7 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
});

join_all([
update_crossbar_cache_handle,
listen_to_updates_handle,
process_account_updates_handle,
update_account_map_handle,
Expand All @@ -239,15 +285,14 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
async fn listen_to_updates(ctx: Arc<Context>) {
loop {
info!("Connecting geyser client");
let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
false,
)
.await;
let geyser_client_connection_result =
GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string())
.unwrap()
.x_token(Some(ctx.config.rpc_token.to_string()))
.unwrap()
.connect()
.await;

info!("Connected");

let mut geyser_client = match geyser_client_connection_result {
Expand Down
Loading

0 comments on commit d33e649

Please sign in to comment.