Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchnew txpooler startup race #411

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-util = "0.7"
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down
10 changes: 7 additions & 3 deletions bench/src/benches/confirmation_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ pub async fn send_bulk_txs_and_wait(
max_timeout: Duration,
) -> anyhow::Result<Metric> {
trace!("Get latest blockhash and generate transactions");
let hash = rpc.get_latest_blockhash().await.map_err(|err| {
let recent_blockhash = rpc.get_latest_blockhash().await.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
err
})?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txs, payer, hash, &mut rng, tx_params);
let txs = generate_txs(num_txs, payer, recent_blockhash, &mut rng, tx_params);

trace!("Sending {} transactions in bulk ..", txs.len());
trace!(
"Sending {} transactions with blockhash {} to RPC sendTransaction in bulk ..",
txs.len(),
recent_blockhash
);
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(
rpc,
Expand Down
45 changes: 27 additions & 18 deletions bench/src/benches/rpc_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use futures::TryFutureExt;
use itertools::Itertools;
use log::{debug, trace, warn};

use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
Expand All @@ -23,6 +22,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use url::Url;

pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
Expand All @@ -34,7 +34,7 @@ pub enum ConfirmationResponseFromRpc {
// RPC error on send_transaction
SendError(Arc<ErrorKind>),
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
// transaction_confirmation_status is "confirmed" (finalized is not reported by blockSubscribe websocket
// transaction_confirmation_status is "confirmed"
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
// timout waiting for confirmation status
Timeout(Duration),
Expand All @@ -47,11 +47,11 @@ pub async fn send_and_confirm_bulk_transactions(
txs: &[VersionedTransaction],
max_timeout: Duration,
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
trace!("Polling for next slot ..");
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);
debug!(
"send_transaction for {} txs with timeout {:.03}s",
txs.len(),
max_timeout.as_secs_f32()
);

let send_config = RpcSendTransactionConfig {
skip_preflight: true,
Expand All @@ -61,14 +61,27 @@ pub async fn send_and_confirm_bulk_transactions(
min_context_slot: None,
};

// note: we get confirmed but never finaliized
let tx_listener_startup_token = CancellationToken::new();

// note: we get confirmed but not finalized
let tx_listener_startup_token_cp = tx_listener_startup_token.clone();
let (tx_status_map, _jh_collector) = start_tx_status_collector(
tx_status_websocket_addr.clone(),
payer_pubkey,
CommitmentConfig::confirmed(),
tx_listener_startup_token_cp,
)
.await;

// waiting for thread to cancel the token
tx_listener_startup_token.cancelled().await;

trace!("Waiting for next slot before sending transactions ..");
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);

let started_at = Instant::now();
trace!(
"Sending {} transactions via RPC (retries=off) ..",
Expand All @@ -87,7 +100,7 @@ pub async fn send_and_confirm_bulk_transactions(
.context("get slot afterwards")?;

if after_send_slot - send_slot > 0 {
warn!(
debug!(
"Slot advanced during sending transactions: {} -> {}",
send_slot, after_send_slot
);
Expand All @@ -110,9 +123,9 @@ pub async fn send_and_confirm_bulk_transactions(
for (i, tx_sig) in txs.iter().enumerate() {
let tx_sent = batch_sigs_or_fails[i].is_ok();
if tx_sent {
trace!("- tx_sent {}", tx_sig.get_signature());
debug!("- tx_sent {}", tx_sig.get_signature());
} else {
trace!("- tx_fail {}", tx_sig.get_signature());
debug!("- tx_fail {}", tx_sig.get_signature());
}
}
let elapsed = started_at.elapsed();
Expand Down Expand Up @@ -147,13 +160,9 @@ pub async fn send_and_confirm_bulk_transactions(

// items get moved from pending_status_set to result_status_map

debug!(
"Waiting for transaction confirmations from websocket source <{}> ..",
obfuscate_rpcurl(tx_status_websocket_addr.as_str())
);
let started_at = Instant::now();
let timeout_at = started_at + max_timeout;
// "poll" the status dashmap
// "poll" the status dashmap which gets updated by the tx status collector task
'polling_loop: for iteration in 1.. {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 100);
assert_eq!(
Expand All @@ -170,7 +179,7 @@ pub async fn send_and_confirm_bulk_transactions(
// status is confirmed
if pending_status_set.remove(tx_sig) {
trace!(
"take status for sig {:?} and confirmed_slot: {:?} from websocket source",
"websocket source tx status for sig {:?} and confirmed_slot: {:?}",
tx_sig,
confirmed_slot
);
Expand Down Expand Up @@ -255,7 +264,7 @@ pub async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result<Slot, Error>
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
trace!("polling slot {}", slot);
trace!(".. polling slot {}", slot);
if let Some(last_slot) = last_slot {
if last_slot + 1 == slot {
break slot;
Expand Down
15 changes: 9 additions & 6 deletions bench/src/benches/tx_status_websocket_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::AbortHandle;
use tokio_util::sync::CancellationToken;
use url::Url;
use websocket_tungstenite_retry::websocket_stable;
use websocket_tungstenite_retry::websocket_stable::WsMessage;

// returns map of transaction signatures to the slot they were confirmed
// returns map of transaction signatures to the slot they were confirmed (or finalized)
// the caller must await for the token to be cancelled to prevent startup race condition
pub async fn start_tx_status_collector(
ws_url: Url,
payer_pubkey: Pubkey,
commitment_config: CommitmentConfig,
startup_token: CancellationToken,
) -> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
// e.g. "commitment"
let commitment_str = format!("{:?}", commitment_config);

// note: no commitment paramter is provided; according to the docs we get confirmed+finalized but never processed
let mut web_socket_slots = websocket_stable::StableWebSocket::new_with_timeout(
ws_url,
json!({
Expand Down Expand Up @@ -52,6 +55,9 @@ pub async fn start_tx_status_collector(

let observed_transactions_write = Arc::downgrade(&observed_transactions);
let jh = tokio::spawn(async move {
// notify the caller that we are ready to receive messages
startup_token.cancel();
debug!("Websocket subscription to 'blockSubscribe' is ready to observe signatures in confirmed blocks");
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
Expand All @@ -67,10 +73,7 @@ pub async fn start_tx_status_collector(
{
for tx_sig in tx_sigs_from_block {
let tx_sig = Signature::from_str(&tx_sig).unwrap();
debug!(
"Transaction signature found in block: {} - slot {}",
tx_sig, slot
);
debug!("Transaction signature found in block {}: {}", slot, tx_sig);
map.entry(tx_sig).or_insert(slot);
}
}
Expand Down
2 changes: 1 addition & 1 deletion bench/src/benchnew.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ enum SubCommand {
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 15_000)]
#[clap(short, long, default_value_t = 30_000)]
max_timeout_ms: u64,
#[clap(short, long)]
num_of_runs: usize,
Expand Down
Loading