Skip to content

Commit

Permalink
Remove new_thread_local_multi_iterator
Browse files Browse the repository at this point in the history
Towards: anza-xyz#3357
  • Loading branch information
ksolana committed Nov 23, 2024
1 parent 31742ca commit 4695fbb
Showing 1 changed file with 3 additions and 88 deletions.
91 changes: 3 additions & 88 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,92 +415,6 @@ impl BankingStage {
}
}

#[allow(clippy::too_many_arguments)]
pub fn new_thread_local_multi_iterator(
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
gossip_vote_receiver: BankingPacketReceiver,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let data_budget = Arc::new(DataBudget::default());
let batch_limit =
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = {
let bank = bank_forks.read().unwrap().working_bank();
Arc::new(LatestUnprocessedVotes::new(&bank))
};

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|id| {
let (packet_receiver, unprocessed_transaction_storage) = match id {
0 => (
gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Gossip,
),
),
1 => (
tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Tpu,
),
),
_ => (
non_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Transactions,
),
),
};

let forwarder = Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
);

Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
})
.collect();
Self { bank_thread_hdls }
}

#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
cluster_info: &impl LikeClusterInfo,
Expand Down Expand Up @@ -1183,19 +1097,20 @@ mod tests {
create_test_recorder(bank.clone(), blockstore, Some(poh_config), None);
let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
let cluster_info = Arc::new(cluster_info);
let _banking_stage = BankingStage::new_thread_local_multi_iterator(
let _banking_stage = BankingStage::new(
BlockProductionMethod::CentralScheduler,
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
3,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
false,
);

// wait for banking_stage to eat the packets
Expand Down

0 comments on commit 4695fbb

Please sign in to comment.