From b8203910a9239327a4a9e90ac18b07e74fcfa56b Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 4 Feb 2025 05:40:23 +0000 Subject: [PATCH] score batches after pull responses are chunked --- gossip/src/cluster_info.rs | 117 ++++++++++++++--------------- gossip/src/cluster_info_metrics.rs | 2 +- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index ee34e45981fd41..faed7f4c2cd09b 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -49,7 +49,6 @@ use { rand::{seq::SliceRandom, CryptoRng, Rng}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_ledger::shred::Shred, - solana_measure::measure::Measure, solana_net_utils::{ bind_common_in_range_with_config, bind_common_with_config, bind_in_range, bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified, @@ -1737,7 +1736,6 @@ impl ClusterInfo { stakes: &HashMap, ) -> PacketBatch { const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; - let mut time = Measure::start("handle_pull_requests"); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; let mut packet_batch = @@ -1769,72 +1767,81 @@ impl ClusterInfo { &self.stats, ) }; - let mut total_crds_values = 0; - // send pull_responses[i] to addrs[i] - let mut batches: Vec<(usize, u64)> = Vec::with_capacity(pull_responses.len()); + struct Chunk<'a> { + batch_index: usize, + values: Vec<&'a CrdsValue>, + score: u64, + } + let mut num_crds_values = 0; + let mut chunks = Vec::with_capacity(pull_responses.len()); for (batch_index, crds_values) in pull_responses.iter().enumerate() { if crds_values.is_empty() { continue; } - // batch-level score. max value of batch - let batch_score = crds_values - .iter() - .map(|value| { - let age = now.saturating_sub(value.wallclock()); - let mut score = DEFAULT_EPOCH_DURATION_MS - .saturating_sub(age) - .div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) - .max(1); - if stakes.contains_key(&value.pubkey()) { - score *= 2; - } - if let CrdsData::ContactInfo(_) = value.data() { - score *= 2; - } - score - }) - .max() - .unwrap_or_default(); + num_crds_values += crds_values.len(); + + for chunk_values in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) { + let chunk_score = chunk_values + .iter() + .map(|value| { + let age = now.saturating_sub(value.wallclock()); + // score CrdsValue: 2x score if staked; 2x score if ContactInfo + DEFAULT_EPOCH_DURATION_MS + .saturating_sub(age) + .div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) + .max(1) + .saturating_mul(1 + u64::from(stakes.contains_key(&value.pubkey()))) + .saturating_mul( + 1 + u64::from(matches!(value.data(), CrdsData::ContactInfo(_))) + ) + }) + .max() + .unwrap_or_default(); - total_crds_values += crds_values.len(); - batches.push((batch_index, batch_score)); + chunks.push(Chunk { + batch_index, + values: chunk_values, + score: chunk_score, + }); + } } - if batches.is_empty() { + if chunks.is_empty() { return packet_batch; } - let scores: Vec = batches.iter().map(|(_, score)| *score).collect(); + let scores: Vec = chunks.iter().map(|c| c.score).collect(); let mut rng = rand::thread_rng(); let shuffle = WeightedShuffle::new("handle-pull-requests", &scores).shuffle(&mut rng); let mut total_bytes = 0; let mut sent_pull_responses = 0; let mut sent_crds_values = 0; - for (batch_index, _) in shuffle.map(|i: usize| &batches[i]) { - let crds_values = &pull_responses[*batch_index]; - let addr = &addrs[*batch_index]; - for chunk_refs in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) { - let chunk: Vec = chunk_refs.into_iter().cloned().collect(); - let chunk_len = chunk.len(); - let response = Protocol::PullResponse(self_id, chunk); - match Packet::from_data(Some(addr), response) { - Err(err) => { - error!("failed to write pull-response packet: {:?}", err); - } - Ok(packet) => { - if self.outbound_budget.take(packet.meta().size) { - total_bytes += packet.meta().size; - packet_batch.push(packet); - sent_pull_responses += 1; - sent_crds_values += chunk_len; - } else { - self.stats.gossip_pull_request_no_budget.add_relaxed(1); - break; - } + for chunk in shuffle.map(|i: usize| &chunks[i]) { + let Chunk { + batch_index: addr_index, + values, + .. + } = chunk; + let addr = &addrs[*addr_index]; + let chunk_values: Vec = values.iter().map(|v| (*v).clone()).collect(); + let response = Protocol::PullResponse(self_id, chunk_values); + match Packet::from_data(Some(addr), response) { + Err(err) => { + error!("failed to write pull-response packet: {:?}", err); + } + Ok(packet) => { + let packet_size = packet.meta().size; + if self.outbound_budget.take(packet_size) { + total_bytes += packet_size; + packet_batch.push(packet); + sent_pull_responses += 1; + sent_crds_values += values.len(); + } else { + self.stats.gossip_pull_request_no_budget.add_relaxed(1); + break; } } } } - time.stop(); - let dropped_responses = total_crds_values.saturating_sub(sent_crds_values); + let dropped_responses = num_crds_values.saturating_sub(sent_crds_values); self.stats .gossip_pull_request_sent_requests .add_relaxed(sent_pull_responses as u64); @@ -1844,14 +1851,6 @@ impl ClusterInfo { self.stats .gossip_pull_request_sent_bytes .add_relaxed(total_bytes as u64); - debug!( - "handle_pull_requests: {} sent pull responses: {} total crds values: {} sent_crds_values: {} total_bytes: {}", - time, - sent_pull_responses, - total_crds_values, - sent_crds_values, - total_bytes - ); packet_batch } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 483a4e2551dfed..9978736a7b0696 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -107,8 +107,8 @@ pub struct GossipStats { pub(crate) gossip_packets_dropped_count: Counter, pub(crate) gossip_pull_request_dropped_requests: Counter, pub(crate) gossip_pull_request_no_budget: Counter, - pub(crate) gossip_pull_request_sent_requests: Counter, pub(crate) gossip_pull_request_sent_bytes: Counter, + pub(crate) gossip_pull_request_sent_requests: Counter, pub(crate) gossip_transmit_loop_iterations_since_last_report: Counter, pub(crate) gossip_transmit_loop_time: Counter, pub(crate) handle_batch_ping_messages_time: Counter,