Skip to content

Commit

Permalink
score batches after pull responses are chunked
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Feb 4, 2025
1 parent 1d0c0ff commit b820391
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 60 deletions.
117 changes: 58 additions & 59 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use {
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
solana_net_utils::{
bind_common_in_range_with_config, bind_common_with_config, bind_in_range,
bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified,
Expand Down Expand Up @@ -1737,7 +1736,6 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
) -> PacketBatch {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests");
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packet_batch =
Expand Down Expand Up @@ -1769,72 +1767,81 @@ impl ClusterInfo {
&self.stats,
)
};
let mut total_crds_values = 0;
// send pull_responses[i] to addrs[i]
let mut batches: Vec<(usize, u64)> = Vec::with_capacity(pull_responses.len());
struct Chunk<'a> {
batch_index: usize,
values: Vec<&'a CrdsValue>,
score: u64,
}
let mut num_crds_values = 0;
let mut chunks = Vec::with_capacity(pull_responses.len());
for (batch_index, crds_values) in pull_responses.iter().enumerate() {
if crds_values.is_empty() {
continue;
}
// batch-level score. max value of batch
let batch_score = crds_values
.iter()
.map(|value| {
let age = now.saturating_sub(value.wallclock());
let mut score = DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
if stakes.contains_key(&value.pubkey()) {
score *= 2;
}
if let CrdsData::ContactInfo(_) = value.data() {
score *= 2;
}
score
})
.max()
.unwrap_or_default();
num_crds_values += crds_values.len();

for chunk_values in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) {
let chunk_score = chunk_values
.iter()
.map(|value| {
let age = now.saturating_sub(value.wallclock());
// score CrdsValue: 2x score if staked; 2x score if ContactInfo
DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1)
.saturating_mul(1 + u64::from(stakes.contains_key(&value.pubkey())))
.saturating_mul(
1 + u64::from(matches!(value.data(), CrdsData::ContactInfo(_)))
)
})
.max()
.unwrap_or_default();

total_crds_values += crds_values.len();
batches.push((batch_index, batch_score));
chunks.push(Chunk {
batch_index,
values: chunk_values,
score: chunk_score,
});
}
}
if batches.is_empty() {
if chunks.is_empty() {
return packet_batch;
}
let scores: Vec<u64> = batches.iter().map(|(_, score)| *score).collect();
let scores: Vec<u64> = chunks.iter().map(|c| c.score).collect();
let mut rng = rand::thread_rng();
let shuffle = WeightedShuffle::new("handle-pull-requests", &scores).shuffle(&mut rng);
let mut total_bytes = 0;
let mut sent_pull_responses = 0;
let mut sent_crds_values = 0;
for (batch_index, _) in shuffle.map(|i: usize| &batches[i]) {
let crds_values = &pull_responses[*batch_index];
let addr = &addrs[*batch_index];
for chunk_refs in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) {
let chunk: Vec<CrdsValue> = chunk_refs.into_iter().cloned().collect();
let chunk_len = chunk.len();
let response = Protocol::PullResponse(self_id, chunk);
match Packet::from_data(Some(addr), response) {
Err(err) => {
error!("failed to write pull-response packet: {:?}", err);
}
Ok(packet) => {
if self.outbound_budget.take(packet.meta().size) {
total_bytes += packet.meta().size;
packet_batch.push(packet);
sent_pull_responses += 1;
sent_crds_values += chunk_len;
} else {
self.stats.gossip_pull_request_no_budget.add_relaxed(1);
break;
}
for chunk in shuffle.map(|i: usize| &chunks[i]) {
let Chunk {
batch_index: addr_index,
values,
..
} = chunk;
let addr = &addrs[*addr_index];
let chunk_values: Vec<CrdsValue> = values.iter().map(|v| (*v).clone()).collect();
let response = Protocol::PullResponse(self_id, chunk_values);
match Packet::from_data(Some(addr), response) {
Err(err) => {
error!("failed to write pull-response packet: {:?}", err);
}
Ok(packet) => {
let packet_size = packet.meta().size;
if self.outbound_budget.take(packet_size) {
total_bytes += packet_size;
packet_batch.push(packet);
sent_pull_responses += 1;
sent_crds_values += values.len();
} else {
self.stats.gossip_pull_request_no_budget.add_relaxed(1);
break;
}
}
}
}
time.stop();
let dropped_responses = total_crds_values.saturating_sub(sent_crds_values);
let dropped_responses = num_crds_values.saturating_sub(sent_crds_values);
self.stats
.gossip_pull_request_sent_requests
.add_relaxed(sent_pull_responses as u64);
Expand All @@ -1844,14 +1851,6 @@ impl ClusterInfo {
self.stats
.gossip_pull_request_sent_bytes
.add_relaxed(total_bytes as u64);
debug!(
"handle_pull_requests: {} sent pull responses: {} total crds values: {} sent_crds_values: {} total_bytes: {}",
time,
sent_pull_responses,
total_crds_values,
sent_crds_values,
total_bytes
);
packet_batch
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ pub struct GossipStats {
pub(crate) gossip_packets_dropped_count: Counter,
pub(crate) gossip_pull_request_dropped_requests: Counter,
pub(crate) gossip_pull_request_no_budget: Counter,
pub(crate) gossip_pull_request_sent_requests: Counter,
pub(crate) gossip_pull_request_sent_bytes: Counter,
pub(crate) gossip_pull_request_sent_requests: Counter,
pub(crate) gossip_transmit_loop_iterations_since_last_report: Counter,
pub(crate) gossip_transmit_loop_time: Counter,
pub(crate) handle_batch_ping_messages_time: Counter,
Expand Down

0 comments on commit b820391

Please sign in to comment.