Skip to content

Commit

Permalink
fix: ping madness
Browse files Browse the repository at this point in the history
  • Loading branch information
losman0s committed Jan 5, 2024
1 parent bcf579c commit e3a6ecb
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 93 deletions.
29 changes: 10 additions & 19 deletions observability/indexer/src/commands/index_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn listen_to_updates(ctx: Arc<Context>) {
let geyser_client_connection_result = GeyserGrpcClient::connect(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
Some(ClientTlsConfig::new()),
None,
);

let mut geyser_client = match geyser_client_connection_result {
Expand All @@ -130,15 +130,6 @@ async fn listen_to_updates(ctx: Arc<Context>) {
}
};

// Establish streams
let (mut subscribe_request_sink, mut stream) = match geyser_client.subscribe().await {
Ok(value) => value,
Err(e) => {
error!("Error subscribing geyser client {e}");
continue;
}
};

let subscribe_request = SubscribeRequest {
accounts: HashMap::from_iter([(
ctx.config.program_id.to_string(),
Expand All @@ -152,20 +143,20 @@ async fn listen_to_updates(ctx: Arc<Context>) {
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
)]),
transactions: HashMap::default(),
blocks: HashMap::default(),
ping: Some(SubscribeRequestPing::default()),
..Default::default()
};

// Send initial subscription config
match subscribe_request_sink.send(subscribe_request).await {
Ok(()) => info!("Successfully sent initial subscription config"),
// Establish streams
let (mut subscribe_request_sink, mut stream) = match geyser_client
.subscribe_with_request(Some(subscribe_request))
.await
{
Ok(value) => value,
Err(e) => {
error!("Error establishing geyser sub: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
error!("Error subscribing geyser client {e}");
continue;
}
}
};

while let Some(received) = stream.next().await {
match received {
Expand Down
65 changes: 25 additions & 40 deletions observability/indexer/src/commands/index_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use base64::{engine::general_purpose, Engine};
use chrono::{DateTime, Utc};
use envconfig::Envconfig;
use futures::{future::join_all, SinkExt, StreamExt};
use futures::{future::join_all, StreamExt};
use google_cloud_default::WithAuthExt;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::{Client, ClientConfig};
Expand All @@ -21,17 +21,15 @@ use std::{
},
time::Duration,
};
use tonic::Status;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::{
convert_from,
geyser::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeRequestPing,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
},
tonic::transport::ClientTlsConfig,
};

#[derive(Envconfig, Debug, Clone)]
Expand All @@ -42,8 +40,6 @@ pub struct IndexTransactionsConfig {
pub rpc_token: String,
#[envconfig(from = "INDEX_TRANSACTIONS_SLOTS_BUFFER_SIZE")]
pub slots_buffer_size: u32,
#[envconfig(from = "INDEX_TRANSACTIONS_MAX_CONCURRENT_REQUESTS")]
pub max_concurrent_requests: usize,
#[envconfig(from = "INDEX_TRANSACTIONS_MONITOR_INTERVAL")]
pub monitor_interval: u64,
#[envconfig(from = "INDEX_TRANSACTIONS_PROGRAM_ID")]
Expand Down Expand Up @@ -118,11 +114,16 @@ pub async fn index_transactions(config: IndexTransactionsConfig) -> Result<()> {
async fn listen_to_updates(ctx: Arc<Context>) {
loop {
info!("Connecting geyser client");
let geyser_client_connection_result = GeyserGrpcClient::connect(
let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
Some(ClientTlsConfig::new()),
);
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
false,
)
.await;
info!("Connected");

let mut geyser_client = match geyser_client_connection_result {
Ok(geyser_client) => geyser_client,
Expand All @@ -133,20 +134,12 @@ async fn listen_to_updates(ctx: Arc<Context>) {
}
};

// Establish streams
let (mut subscribe_request_sink, mut stream) = match geyser_client.subscribe().await {
Ok(value) => value,
Err(e) => {
error!("Error subscribing geyser client {e}");
continue;
}
};

let subscribe_request = SubscribeRequest {
accounts: HashMap::default(),
slots: HashMap::from_iter([(
"slots".to_string(),
SubscribeRequestFilterSlots::default(),
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(false),
},
)]),
transactions: HashMap::from_iter([(
ctx.config.program_id.to_string(),
Expand All @@ -158,18 +151,21 @@ async fn listen_to_updates(ctx: Arc<Context>) {
..Default::default()
},
)]),
ping: Some(SubscribeRequestPing::default()),
commitment: Some(CommitmentLevel::Processed as i32),
..Default::default()
};

// Send initial subscription config
match subscribe_request_sink.send(subscribe_request).await {
Ok(()) => info!("Successfully sent initial subscription config"),
// Establish streams
let (_, mut stream) = match geyser_client
.subscribe_with_request(Some(subscribe_request))
.await
{
Ok(value) => value,
Err(e) => {
error!("Error establishing geyser sub: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
error!("Error subscribing geyser client {e}");
continue;
}
}
};

while let Some(received) = stream.next().await {
match received {
Expand Down Expand Up @@ -237,7 +233,6 @@ fn process_update(ctx: Arc<Context>, filters: &[String], update: UpdateOneof) ->
indexing_addresses: filters.to_vec(),
transaction,
});
// println!("slot_transactions for {:?} at {}: {}", filters.to_vec(), transaction_update.slot, slot_transactions.len());
} else {
anyhow::bail!("Expected `transaction` in `UpdateOneof::Transaction` update");
}
Expand All @@ -258,7 +253,7 @@ fn process_update(ctx: Arc<Context>, filters: &[String], update: UpdateOneof) ->
debug!("ping");
}
_ => {
warn!("unknown update");
warn!("unknown update: {:?}", update);
}
}

Expand Down Expand Up @@ -327,16 +322,6 @@ pub async fn push_transactions_to_pubsub(ctx: Arc<Context>) -> Result<()> {

transactions_data.iter().for_each(|transaction_data| {
ctx.transactions_counter.fetch_add(1, Ordering::Relaxed);
// println!(
// "{:?} - {}",
// transaction_data.indexing_addresses,
// transaction_data
// .transaction
// .transaction
// .signatures
// .first()
// .unwrap()
// );

let now = Utc::now();

Expand Down
67 changes: 33 additions & 34 deletions observability/indexer/src/commands/snapshot_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use envconfig::Envconfig;
use futures::SinkExt;
use futures::{future::join_all, pin_mut, StreamExt};
use futures::{future::join_all, StreamExt};
use gcp_bigquery_client::model::table_data_insert_all_request::TableDataInsertAllRequest;
use itertools::Itertools;
use rayon::prelude::*;
Expand All @@ -33,9 +33,8 @@ use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeRequestPing,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yup_oauth2::parse_service_account_key;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -165,13 +164,17 @@ async fn compute_geyser_config(
},
),
]),
slots: HashMap::from_iter([("slots".to_string(), SubscribeRequestFilterSlots::default())]),
transactions: HashMap::default(),
blocks: HashMap::from_iter([(
"blocks".to_string(),
SubscribeRequestFilterBlocks::default(),
slots: HashMap::from_iter([(
"slots".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(false),
},
)]),
blocks_meta: HashMap::from_iter([(
"blocks_meta".to_string(),
SubscribeRequestFilterBlocksMeta::default(),
)]),
ping: Some(SubscribeRequestPing::default()),
commitment: Some(CommitmentLevel::Processed as i32),
..Default::default()
}
}
Expand Down Expand Up @@ -236,13 +239,18 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
async fn listen_to_updates(ctx: Arc<Context>) {
loop {
info!("Connecting geyser client");
let geyser_client_connection_result = GeyserGrpcClient::connect(
ctx.config.rpc_endpoint_geyser.to_string(),
let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout(
ctx.config.rpc_endpoint.to_string(),
Some(ctx.config.rpc_token.to_string()),
Some(ClientTlsConfig::new()),
);
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
false,
)
.await;
info!("Connected");

let geyser_client = match geyser_client_connection_result {
let mut geyser_client = match geyser_client_connection_result {
Ok(geyser_client) => geyser_client,
Err(err) => {
error!("Error connecting to geyser client: {}", err);
Expand All @@ -251,37 +259,28 @@ async fn listen_to_updates(ctx: Arc<Context>) {
}
};

let geyser_client = Box::pin(geyser_client);
pin_mut!(geyser_client);

// Establish streams
let (mut subscribe_request_sink, mut stream) = match geyser_client.subscribe().await {
let geyser_sub_config = ctx.geyser_subscription_config.lock().await;
let (mut subscribe_request_sink, mut stream) = match geyser_client
.subscribe_with_request(Some(geyser_sub_config.1.clone()))
.await
{
Ok(value) => value,
Err(e) => {
error!("Error subscribing geyser client {e}");
continue;
}
};
drop(geyser_sub_config);

// Send initial subscription config
let geyser_sub_config = ctx.geyser_subscription_config.lock().await;
match subscribe_request_sink
.send(geyser_sub_config.1.clone())
.await
{
Ok(()) => info!("Successfully sent initial subscription config"),
Err(e) => {
error!("Error establishing geyser sub: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
debug!("Starting to listen to updates");

// Main loop
while let Some(received) = stream.next().await {
// Check if we need to update the subscription
let mut geyser_sub_config = ctx.geyser_subscription_config.lock().await;
if geyser_sub_config.0 {
warn!("Config update: {:?}", geyser_sub_config.1);
debug!("Config update");
geyser_sub_config.0 = false;

match subscribe_request_sink
Expand Down Expand Up @@ -370,8 +369,8 @@ async fn process_update(ctx: Arc<Context>, update: UpdateOneof) -> Result<()> {
}
}
}
UpdateOneof::Block(block_update) => {
if let Some(block_time) = block_update.block_time {
UpdateOneof::BlockMeta(block_meta_update) => {
if let Some(block_time) = block_meta_update.block_time {
ctx.timestamp.store(block_time.timestamp, Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -460,7 +459,7 @@ pub async fn update_account_map(ctx: Arc<Context>) {
.collect_vec();
let updated_geyser_config =
compute_geyser_config(&ctx.config, &non_program_accounts).await;
info!("updating geyser sub");
debug!("updating geyser sub");
*ctx.geyser_subscription_config.lock().await = (true, updated_geyser_config);
}
}
Expand Down

0 comments on commit e3a6ecb

Please sign in to comment.