diff --git a/Cargo.lock b/Cargo.lock index fe006403..f381e8e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12254,6 +12254,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", + "sha2 0.10.9", "solana-account 3.1.0", "solana-account-decoder", "solana-address-lookup-table-interface 3.0.0", diff --git a/Cargo.toml b/Cargo.toml index 9dec37c5..006cf6eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,6 +158,7 @@ tracing = { version = "0.1.41", default-features = false } url = { version = "1.7.2", default-features = false } uuid = "1.15.1" zip = { version = "0.6", features = ["deflate"], default-features = false } +sha2 = { version = "0.10" } surfpool-core = { path = "crates/core", default-features = false } surfpool-db = { path = "crates/db" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index c92f489c..c6ef4d33 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -45,6 +45,7 @@ reqwest = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } # must match the serde version, see https://github.com/serde-rs/serde/issues/2584#issuecomment-1685252251 serde_json = { workspace = true } +sha2 = { workspace = true } serde_yaml = "0.9" solana-account = { workspace = true } solana-account-decoder = { workspace = true } diff --git a/crates/core/src/rpc/jito.rs b/crates/core/src/rpc/jito.rs new file mode 100644 index 00000000..da659493 --- /dev/null +++ b/crates/core/src/rpc/jito.rs @@ -0,0 +1,393 @@ +use std::str::FromStr; + +use jsonrpc_core::{Error, Result}; +use jsonrpc_derive::rpc; +use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_custom_error::RpcCustomError}; +use solana_signature::Signature; + +use super::{ + RunloopContext, + full::{Full, SurfpoolFullRpc, SurfpoolRpcSendTransactionConfig}, +}; + +/// Jito-specific RPC methods for bundle submission +#[rpc] +pub trait Jito { + type Metadata; + + /// Sends a bundle of transactions to be processed sequentially. + /// + /// This RPC method accepts a bundle of transactions (Jito-compatible format) and processes them + /// one by one in order. All transactions in the bundle must succeed for the bundle to be accepted. + /// + /// ## Parameters + /// - `transactions`: An array of serialized transaction data (base64 or base58 encoded). + /// - `config`: Optional configuration for encoding format. + /// + /// ## Returns + /// - `Result`: A bundle ID (SHA-256 hash of comma-separated signatures). + /// + /// ## Example Request (JSON-RPC) + /// ```json + /// { + /// "jsonrpc": "2.0", + /// "id": 1, + /// "method": "sendBundle", + /// "params": [ + /// ["base64EncodedTx1", "base64EncodedTx2"], + /// { "encoding": "base64" } + /// ] + /// } + /// ``` + /// + /// ## Notes + /// - Transactions are processed sequentially in the order provided + /// - Each transaction must complete successfully before the next one starts + /// - If any transaction fails, the entire bundle is rejected + /// - The bundle ID is calculated as SHA-256 hash of comma-separated transaction signatures + #[rpc(meta, name = "sendBundle")] + fn send_bundle( + &self, + meta: Self::Metadata, + transactions: Vec, + config: Option, + ) -> Result; +} + +#[derive(Clone)] +pub struct SurfpoolJitoRpc; + +impl Jito for SurfpoolJitoRpc { + type Metadata = Option; + + fn send_bundle( + &self, + meta: Self::Metadata, + transactions: Vec, + config: Option, + ) -> Result { + if transactions.is_empty() { + return Err(Error::invalid_params("Bundle cannot be empty")); + } + + let Some(_ctx) = &meta else { + return Err(RpcCustomError::NodeUnhealthy { + num_slots_behind: None, + } + .into()); + }; + + let full_rpc = SurfpoolFullRpc; + let mut bundle_signatures = Vec::new(); + + // Process each transaction in the bundle sequentially using Full RPC + // Force skip_preflight to match Jito Block Engine behavior (no simulation on sendBundle) + for (idx, tx_data) in transactions.iter().enumerate() { + let base_config = config.clone().unwrap_or_default(); + let bundle_config = Some(SurfpoolRpcSendTransactionConfig { + base: RpcSendTransactionConfig { + skip_preflight: true, + ..base_config + }, + skip_sig_verify: None, + }); + // Delegate to Full RPC's sendTransaction method + match full_rpc.send_transaction(meta.clone(), tx_data.clone(), bundle_config) { + Ok(signature_str) => { + // Parse the signature to collect for bundle ID calculation + let signature = Signature::from_str(&signature_str).map_err(|e| { + Error::invalid_params(format!("Failed to parse signature: {e}")) + })?; + bundle_signatures.push(signature); + } + Err(e) => { + // Add bundle transaction index to error message + return Err(Error { + code: e.code, + message: format!("Bundle transaction {} failed: {}", idx, e.message), + data: e.data, + }); + } + } + } + + // Calculate bundle ID by hashing comma-separated signatures (Jito-compatible) + // https://github.com/jito-foundation/jito-solana/blob/master/sdk/src/bundle/mod.rs#L21 + use sha2::{Digest, Sha256}; + let concatenated_signatures = bundle_signatures + .iter() + .map(|sig| sig.to_string()) + .collect::>() + .join(","); + let mut hasher = Sha256::new(); + hasher.update(concatenated_signatures.as_bytes()); + let bundle_id = hasher.finalize(); + Ok(hex::encode(bundle_id)) + } +} + +#[cfg(test)] +mod tests { + use sha2::{Digest, Sha256}; + use solana_keypair::Keypair; + use solana_message::{VersionedMessage, v0::Message as V0Message}; + use solana_pubkey::Pubkey; + use solana_signer::Signer; + use solana_system_interface::instruction as system_instruction; + use solana_transaction::versioned::VersionedTransaction; + use surfpool_types::{SimnetCommand, TransactionConfirmationStatus, TransactionStatusEvent}; + + use super::*; + use crate::{ + tests::helpers::TestSetup, + types::{SurfnetTransactionStatus, TransactionWithStatusMeta}, + }; + + const LAMPORTS_PER_SOL: u64 = 1_000_000_000; + + fn build_v0_transaction( + payer: &Pubkey, + signers: &[&Keypair], + instructions: &[solana_instruction::Instruction], + recent_blockhash: &solana_hash::Hash, + ) -> VersionedTransaction { + let msg = VersionedMessage::V0( + V0Message::try_compile(payer, instructions, &[], *recent_blockhash).unwrap(), + ); + VersionedTransaction::try_new(msg, signers).unwrap() + } + + #[test] + fn test_send_bundle_empty_bundle_rejected() { + let setup = TestSetup::new(SurfpoolJitoRpc); + let result = setup.rpc.send_bundle(Some(setup.context), vec![], None); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.message.contains("Bundle cannot be empty"), + "Expected 'Bundle cannot be empty' error, got: {}", + err.message + ); + } + + #[test] + fn test_send_bundle_no_context_returns_unhealthy() { + let setup = TestSetup::new(SurfpoolJitoRpc); + let result = setup + .rpc + .send_bundle(None, vec!["some_tx".to_string()], None); + + assert!(result.is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_send_bundle_single_transaction() { + let payer = Keypair::new(); + let recipient = Pubkey::new_unique(); + let (mempool_tx, mempool_rx) = crossbeam_channel::unbounded(); + let setup = TestSetup::new_with_mempool(SurfpoolJitoRpc, mempool_tx); + let recent_blockhash = setup + .context + .svm_locker + .with_svm_reader(|svm_reader| svm_reader.latest_blockhash()); + + // Airdrop to payer + let _ = setup + .context + .svm_locker + .0 + .write() + .await + .airdrop(&payer.pubkey(), 2 * LAMPORTS_PER_SOL); + + let tx = build_v0_transaction( + &payer.pubkey(), + &[&payer], + &[system_instruction::transfer( + &payer.pubkey(), + &recipient, + LAMPORTS_PER_SOL, + )], + &recent_blockhash, + ); + let tx_encoded = bs58::encode(bincode::serialize(&tx).unwrap()).into_string(); + let expected_sig = tx.signatures[0]; + + let setup_clone = setup.clone(); + let handle = hiro_system_kit::thread_named("send_bundle") + .spawn(move || { + setup_clone + .rpc + .send_bundle(Some(setup_clone.context), vec![tx_encoded], None) + }) + .unwrap(); + + // Process the transaction from mempool + loop { + match mempool_rx.recv() { + Ok(SimnetCommand::ProcessTransaction(_, tx, status_tx, _, _)) => { + let mut writer = setup.context.svm_locker.0.write().await; + let slot = writer.get_latest_absolute_slot(); + writer.transactions_queued_for_confirmation.push_back(( + tx.clone(), + status_tx.clone(), + None, + )); + let sig = tx.signatures[0]; + let tx_with_status_meta = TransactionWithStatusMeta { + slot, + transaction: tx, + ..Default::default() + }; + let mutated_accounts = std::collections::HashSet::new(); + writer + .transactions + .store( + sig.to_string(), + SurfnetTransactionStatus::processed( + tx_with_status_meta, + mutated_accounts, + ), + ) + .unwrap(); + status_tx + .send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Confirmed, + )) + .unwrap(); + break; + } + Ok(SimnetCommand::AirdropProcessed) => continue, + _ => panic!("failed to receive transaction from mempool"), + } + } + + let result = handle.join().unwrap(); + assert!(result.is_ok(), "Bundle should succeed: {:?}", result); + + // Verify bundle ID is SHA-256 of the signature + let bundle_id = result.unwrap(); + let mut hasher = Sha256::new(); + hasher.update(expected_sig.to_string().as_bytes()); + let expected_bundle_id = hex::encode(hasher.finalize()); + assert_eq!( + bundle_id, expected_bundle_id, + "Bundle ID should match SHA-256 of signature" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_send_bundle_multiple_transactions() { + let payer = Keypair::new(); + let recipient1 = Pubkey::new_unique(); + let recipient2 = Pubkey::new_unique(); + let (mempool_tx, mempool_rx) = crossbeam_channel::unbounded(); + let setup = TestSetup::new_with_mempool(SurfpoolJitoRpc, mempool_tx); + let recent_blockhash = setup + .context + .svm_locker + .with_svm_reader(|svm_reader| svm_reader.latest_blockhash()); + + // Airdrop to payer + let _ = setup + .context + .svm_locker + .0 + .write() + .await + .airdrop(&payer.pubkey(), 5 * LAMPORTS_PER_SOL); + + let tx1 = build_v0_transaction( + &payer.pubkey(), + &[&payer], + &[system_instruction::transfer( + &payer.pubkey(), + &recipient1, + LAMPORTS_PER_SOL, + )], + &recent_blockhash, + ); + let tx2 = build_v0_transaction( + &payer.pubkey(), + &[&payer], + &[system_instruction::transfer( + &payer.pubkey(), + &recipient2, + LAMPORTS_PER_SOL, + )], + &recent_blockhash, + ); + + let tx1_encoded = bs58::encode(bincode::serialize(&tx1).unwrap()).into_string(); + let tx2_encoded = bs58::encode(bincode::serialize(&tx2).unwrap()).into_string(); + let expected_sig1 = tx1.signatures[0]; + let expected_sig2 = tx2.signatures[0]; + + let setup_clone = setup.clone(); + let handle = hiro_system_kit::thread_named("send_bundle") + .spawn(move || { + setup_clone.rpc.send_bundle( + Some(setup_clone.context), + vec![tx1_encoded, tx2_encoded], + None, + ) + }) + .unwrap(); + + // Process both transactions from mempool + let mut processed_count = 0; + while processed_count < 2 { + match mempool_rx.recv() { + Ok(SimnetCommand::ProcessTransaction(_, tx, status_tx, _, _)) => { + let mut writer = setup.context.svm_locker.0.write().await; + let slot = writer.get_latest_absolute_slot(); + writer.transactions_queued_for_confirmation.push_back(( + tx.clone(), + status_tx.clone(), + None, + )); + let sig = tx.signatures[0]; + let tx_with_status_meta = TransactionWithStatusMeta { + slot, + transaction: tx, + ..Default::default() + }; + let mutated_accounts = std::collections::HashSet::new(); + writer + .transactions + .store( + sig.to_string(), + SurfnetTransactionStatus::processed( + tx_with_status_meta, + mutated_accounts, + ), + ) + .unwrap(); + status_tx + .send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Confirmed, + )) + .unwrap(); + processed_count += 1; + } + Ok(SimnetCommand::AirdropProcessed) => continue, + _ => panic!("failed to receive transaction from mempool"), + } + } + + let result = handle.join().unwrap(); + assert!(result.is_ok(), "Bundle should succeed: {:?}", result); + + // Verify bundle ID is SHA-256 of comma-separated signatures + let bundle_id = result.unwrap(); + let concatenated = format!("{},{}", expected_sig1, expected_sig2); + let mut hasher = Sha256::new(); + hasher.update(concatenated.as_bytes()); + let expected_bundle_id = hex::encode(hasher.finalize()); + assert_eq!( + bundle_id, expected_bundle_id, + "Bundle ID should match SHA-256 of comma-separated signatures" + ); + } +} diff --git a/crates/core/src/rpc/mod.rs b/crates/core/src/rpc/mod.rs index 7da50531..19096862 100644 --- a/crates/core/src/rpc/mod.rs +++ b/crates/core/src/rpc/mod.rs @@ -26,6 +26,7 @@ pub mod accounts_scan; pub mod admin; pub mod bank_data; pub mod full; +pub mod jito; pub mod minimal; pub mod surfnet_cheatcodes; pub mod utils; diff --git a/crates/core/src/runloops/mod.rs b/crates/core/src/runloops/mod.rs index 43b6b242..0036d5ff 100644 --- a/crates/core/src/runloops/mod.rs +++ b/crates/core/src/runloops/mod.rs @@ -47,7 +47,7 @@ use crate::{ rpc::{ self, RunloopContext, SurfpoolMiddleware, SurfpoolWebsocketMeta, SurfpoolWebsocketMiddleware, accounts_data::AccountsData, accounts_scan::AccountsScan, - admin::AdminRpc, bank_data::BankData, full::Full, minimal::Minimal, + admin::AdminRpc, bank_data::BankData, full::Full, jito::Jito, minimal::Minimal, surfnet_cheatcodes::SurfnetCheatcodes, ws::Rpc, }, surfnet::{GeyserEvent, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient}, @@ -858,6 +858,7 @@ async fn start_http_rpc_server_runloop( let mut io = MetaIoHandler::with_middleware(middleware); io.extend_with(rpc::minimal::SurfpoolMinimalRpc.to_delegate()); io.extend_with(rpc::full::SurfpoolFullRpc.to_delegate()); + io.extend_with(rpc::jito::SurfpoolJitoRpc.to_delegate()); io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate()); io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate()); io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate());