From 64c5288d28262954d0b36f1b3fb2eccdbe789b60 Mon Sep 17 00:00:00 2001 From: man0s <95379755+losman0s@users.noreply.github.com> Date: Fri, 18 Oct 2024 19:17:08 +0800 Subject: [PATCH] fix: crossbar calls --- observability/indexer/Cargo.toml | 4 +- observability/indexer/Dockerfile | 4 +- .../indexer/src/commands/snapshot_accounts.rs | 17 ++++++- observability/indexer/src/utils/crossbar.rs | 46 ++++++++++++++----- 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/observability/indexer/Cargo.toml b/observability/indexer/Cargo.toml index ad13b5a2..48aa7a79 100644 --- a/observability/indexer/Cargo.toml +++ b/observability/indexer/Cargo.toml @@ -41,8 +41,8 @@ google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rus yup-oauth2 = "8.3.0" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" } -switchboard-on-demand-client = "0.1.7" -switchboard-on-demand = "0.1.7" +switchboard-on-demand-client = "0.2.4" +switchboard-on-demand = "0.1.15" hex = "0.4.3" fixed = "1.12.0" fixed-macro = "1.2.0" diff --git a/observability/indexer/Dockerfile b/observability/indexer/Dockerfile index 708af0a7..ed1b39ef 100644 --- a/observability/indexer/Dockerfile +++ b/observability/indexer/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.72 as builder +FROM rust:1.75 as builder RUN apt-get update -y && apt-get install -y pkg-config build-essential libudev-dev clang cmake protobuf-compiler RUN rustup component add rustfmt clippy @@ -15,7 +15,7 @@ COPY ./clients/rust ./clients/rust ENV CARGO_NET_GIT_FETCH_WITH_CLI=true -RUN cargo build --release +RUN cargo build --release --locked FROM debian:stable-slim as runner diff --git a/observability/indexer/src/commands/snapshot_accounts.rs b/observability/indexer/src/commands/snapshot_accounts.rs index 641bcf7e..f62d7192 100644 --- a/observability/indexer/src/commands/snapshot_accounts.rs +++ b/observability/indexer/src/commands/snapshot_accounts.rs @@ -211,7 +211,7 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { }) .collect::>(), ); - context.crossbar_store.refresh_prices().await; + context.crossbar_store.refresh_prices().await.unwrap(); snapshot .routing_lookup @@ -236,7 +236,20 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { let context = context.clone(); async move { loop { - context.crossbar_store.refresh_prices().await; + let mut retry_count = 0; + while retry_count < 3 { + match context.crossbar_store.refresh_prices().await { + Ok(_) => break, + Err(e) => { + retry_count += 1; + if retry_count == 3 { + error!("Failed to refresh prices after 3 attempts: {:?}", e); + } else { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } let mut snapshot = context.account_snapshot.lock().await; let feeds_per_address: HashMap = context.crossbar_store.get_prices_per_address(); diff --git a/observability/indexer/src/utils/crossbar.rs b/observability/indexer/src/utils/crossbar.rs index 377960d1..b11e03b2 100644 --- a/observability/indexer/src/utils/crossbar.rs +++ b/observability/indexer/src/utils/crossbar.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use solana_sdk::pubkey::Pubkey; use std::{collections::HashMap, sync::Mutex}; use switchboard_on_demand_client::CrossbarClient; @@ -27,7 +28,7 @@ pub struct CrossbarCache { impl CrossbarCache { /// Creates a new CrossbarCache empty instance pub fn new() -> Self { - let crossbar_client = CrossbarClient::default(None); + let crossbar_client = CrossbarClient::default(); Self { crossbar_client, feeds: Mutex::new(HashMap::new()), @@ -50,24 +51,43 @@ impl CrossbarCache { } } - pub async fn refresh_prices(&self) { + pub async fn refresh_prices(&self) -> Result<()> { if self.feeds.lock().unwrap().is_empty() { - return; + return Ok(()); } - let feed_hashes = self + let feed_hashes: Vec = self .feeds .lock() .unwrap() .values() .map(|feed| feed.feed_meta.feed_hash.clone()) - .collect::>(); - - let simulated_prices = self - .crossbar_client - .simulate_feeds(&feed_hashes.iter().map(|x| x.as_str()).collect::>()) - .await - .unwrap(); + .collect(); + + const CHUNK_SIZE: usize = 20; + + let chunk_futures: Vec<_> = feed_hashes + .chunks(CHUNK_SIZE) + .map(|chunk| { + let client = self.crossbar_client.clone(); + let chunk_vec: Vec = chunk.to_vec(); + tokio::spawn(async move { + client + .simulate_feeds( + &chunk_vec.iter().map(|x| x.as_str()).collect::>(), + ) + .await + }) + }) + .collect(); + + let chunk_results = futures::future::try_join_all(chunk_futures).await?; + let mut simulated_prices = Vec::new(); + for result in chunk_results { + if let Ok(chunk_result) = result { + simulated_prices.extend(chunk_result); + } + } let timestamp = chrono::Utc::now().timestamp(); @@ -83,6 +103,8 @@ impl CrossbarCache { } } } + + Ok(()) } pub fn get_prices_per_address(&self) -> HashMap { @@ -136,7 +158,7 @@ mod tests { feed_hash: feed_hash2.clone(), }, ]); - crossbar_maintainer.refresh_prices().await; + crossbar_maintainer.refresh_prices().await.unwrap(); println!("Price: {:?}", price.lock().unwrap()); println!("Price2: {:?}", price2.lock().unwrap()); }