Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pack pullresponses up to packet limit and outbound data budget #4712

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

gregcusack
Copy link

@gregcusack gregcusack commented Jan 30, 2025

Problem

PullResponses only send one CrdsValue per message despite the fact that the packet can hold more CrdsValues. As a result, the validator sends anywhere from 30-50% more PullResponses as measured on testnet.

Summary of Changes

Pack PullResponse(Pubkey, Vec<CrdsValue>) with as many CrdsValues as possible up toPULL_RESPONSE_MAX_PAYLOAD_SIZE bytes. The outbound PullResponse byte budget (ClusterInfo.outbound_budget) is left in effect.

Data

By packing more CrdsValues per PullResponse, the validator is able to reduce its outbound PullResponses by up to ~33%. See packets_sent_pull_responses_count between master branch (pre 19:42 UTC) and this commit (post 19:42 UTC):
Screenshot 2025-01-30 at 2 15 14 PM

We can achieve this while maintaining a consistent PullRequest processing time: handle_batch_pull_requests_time. See:
Screenshot 2025-01-30 at 2 16 36 PM

@gregcusack gregcusack force-pushed the chunk-pullresponse-crdsvalues branch 3 times, most recently from fe7b929 to b9f0f23 Compare January 30, 2025 21:27
@gregcusack gregcusack marked this pull request as ready for review January 30, 2025 21:50
let mut sent = 0;

// Group CrdsValues by peer address
let mut grouped_by_addr: HashMap<SocketAddr, Vec<CrdsValue>> = HashMap::new();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this hashmap be part of Self? This way its memory does not get reallocated every time this function is called.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if we avoid cloning below and store references in grouped_by_addr: HashMap<SocketAddr, Vec<&CrdsValue>> AND we want to store this hashmap as part of Self aka ClusterInfo, then we have to deal with lifetime parameters for ClusterInfo, which would be a pain. And then we'd have to use &mut self for handle_pull_requests(). So, not sure it is worth it but open to more thoughts, may be missing something here.

Other option is we don't postpone the cloning as suggested below and do:

struct ClusterInfo {
    ...
    grouped_by_addr: HashMap<SocketAddr, Vec<CrdsValue>>
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess cloning is inevitable there since you'd need owned bytes later when forming the packets anyway. But retaining the hashmap in ClusterInfo would still be a nice improvement.

Copy link
Author

@gregcusack gregcusack Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem though is that in this setup, we would be cloning all CrdsValues even if we didn't send all CrdsValues out. So it may make more sense to wait to clone until we send the CrdsValues.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess instead of moving references to CRDS values around you could instead use indices into the original array. That would make borrow checker a bit less cruel.

Copy link
Author

@gregcusack gregcusack Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add grouped_by_addr into ClusterInfo, we then need to borrow ClusterInfo as mut all the way down. And ClusterInfo::listen() takes in self: Arc<Self>. And we can't borrow Self as mutable in an Arc. we could do an Arc<Mutex<Self>>. but this seems like a lot of added complexity for a small benefit.

EDIT: sorry i didn't see your above message before i responded to my own message. and ya will check out using indices

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call. updated!

grouped_by_addr
.entry(**addr)
.or_default()
.push(response.clone());
Copy link

@alexpyattaev alexpyattaev Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe consume values from responses to avoid clone here? Maybe store references in the hashmap?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. then we don't end up cloning any values that don't end up getting sent due to outbound_budget

/// Max size of serialized crds-values in a Protocol::PullResponse packet. This
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty pull
/// message: Protocol::PullResponse(Pubkey::default(), Vec::default())
pub(crate) const PULL_RESPONSE_MAX_PAYLOAD_SIZE: usize = PUSH_MESSAGE_MAX_PAYLOAD_SIZE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need a test to confirm this is actually correct, same as for the other consts here.

Copy link

@behzadnouri behzadnouri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

able to reduce its outbound PullResponses by up to ~33%

It is fair that this reduces "number" of pull-response packets (and its associated overhead), but it sends out bigger packets.
Do you have an estimate how much it reduces number of "bytes" sent out?

Comment on lines 1812 to 1813
for (addr, crds_values) in grouped_by_addr {
for chunk in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me this is no longer prioritizing responses by the scores calculated earlier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg ya you're right. grouped_by_addr is a hashmap so undoing the shuffle. will fix

Comment on lines 1777 to 1786
let age = now.saturating_sub(response.wallclock());
let score = DEFAULT_EPOCH_DURATION_MS
let mut score = DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
let score = if stakes.contains_key(&response.pubkey()) {
2 * score
} else {
score
if stakes.contains_key(&response.pubkey()) {
score *= 2;
};
let score = match response.data() {
CrdsData::ContactInfo(_) => 2 * score,
_ => score,
if let CrdsData::ContactInfo(_) = response.data() {
score *= 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like noop refactor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya it is. i can put into another PR or just leave it as it was before. old code i thought was unnecessarily wordy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate pr or bail is fine. just muddies this change set

};
((addr, response), score)
})
.unzip();
if responses.is_empty() {
return packet_batch;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

superfluous

} else {
self.stats.gossip_pull_request_no_budget.add_relaxed(1);
break;
grouped_by_addr
Copy link

@t-nelson t-nelson Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not just redoing what flat map undid above? looks like the zip/unzip to addr above isn't really necessary. i think we should be doing the batching before score and sort. give each batch the highest score of its member entries

Copy link
Author

@gregcusack gregcusack Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya good point. will fix. we could even score CrdsValue batches in filter_crds_values() when putting together pull responses:

fn filter_crds_values(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
// Predicate returning false if the CRDS value should be discarded.
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let jitter = rand::thread_rng().gen_range(0..msg_timeout / 4);
//skip filters from callers that are too old
let caller_wallclock_window =
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
let dropped_requests = AtomicUsize::default();
let total_skipped = AtomicUsize::default();
let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
let output_size_limit = AtomicI64::new(output_size_limit);
let crds = crds.read().unwrap();
let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| {
if output_size_limit.load(Ordering::Relaxed) <= 0 {
return Vec::default();
}
let caller_wallclock = caller.wallclock();
if !caller_wallclock_window.contains(&caller_wallclock) {
dropped_requests.fetch_add(1, Ordering::Relaxed);
return Vec::default();
}
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
let pred = |entry: &&VersionedCrdsValue| {
debug_assert!(filter.test_mask(entry.value.hash()));
// Skip values that are too new.
if entry.value.wallclock() > caller_wallclock {
total_skipped.fetch_add(1, Ordering::Relaxed);
false
} else {
!filter.filter_contains(entry.value.hash())
&& should_retain_crds_value(&entry.value)
}
};
let out: Vec<_> = crds
.filter_bitmask(filter.mask, filter.mask_bits)
.filter(pred)
.map(|entry| entry.value.clone())
.take(output_size_limit.load(Ordering::Relaxed).max(0) as usize)
.collect();
output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
out
};
let ret: Vec<_> = thread_pool.install(|| {
filters
.par_iter()
.map(|(caller, filter)| apply_filter(caller, filter))
.collect()
});
stats
.filter_crds_values_dropped_requests
.add_relaxed(dropped_requests.into_inner() as u64);
stats
.filter_crds_values_dropped_values
.add_relaxed(total_skipped.into_inner() as u64);
ret
}
. and then we'd return a Vec<(Vec<CrdsValue>, batch_score)>. That way we don't have to loop through all of the crdsvalues again right after we just batched them. would probably make sense to put in a separate PR

@gregcusack
Copy link
Author

able to reduce its outbound PullResponses by up to ~33%

It is fair that this reduces "number" of pull-response packets (and its associated overhead), but it sends out bigger packets. Do you have an estimate how much it reduces number of "bytes" sent out?

it doesn't reduce the number of bytes sent out, which makes sense since we're still capping the sent bytes by the outbound_budget. So we end up just sending fewer packets but bigger packets. which is generally more efficient for sending/receiving. Here is the change in sent bytes (left is this PR, right is master):
Screenshot 2025-01-31 at 9 30 24 AM

@gregcusack gregcusack force-pushed the chunk-pullresponse-crdsvalues branch from 934cb5f to 0df157b Compare January 31, 2025 17:11
@gregcusack gregcusack force-pushed the chunk-pullresponse-crdsvalues branch from 0df157b to 55dd042 Compare January 31, 2025 18:43
}
}
}
}
time.stop();
let dropped_responses = responses.len() - sent;
let dropped_responses = total_crds_values.saturating_sub(sent_crds_values);
self.stats
.gossip_pull_request_sent_requests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats variable name does not seem to match what we are storing there...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, there is one crdsvalue sent per pull response. so, dropped pullresponses == dropped crdsvalues. But now that we are packing crdsvalues into pullresponses, dropped pullresponses != dropped crdsvalues. I think it is more valuable to track the actual dropped crdsvalues.

I can change this variable to be dropped_crds_values?

2 * score
} else {
let mut total_crds_values = 0;
// send pull_responses[i] to addrs[i]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you meant "send pull_responses[i.0] to addrs[i.1]", right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is i.0 and i.1? pull_responses[i]: Vec<CrdsValue> and addrs[i]: SocketAddr. we want to send Vec<CrdsValue> to SocketAddr.

@t-nelson
Copy link

it doesn't reduce the number of bytes sent out, which makes sense since we're still capping the sent bytes by the outbound_budget. So we end up just sending fewer packets but bigger packets. which is generally more efficient for sending/receiving.

oh the goal here was only to reduce network ops? i thought we were only going to sign the batches once to save 96 * (batch.len() - 1) bytes and reduce signing/sigverify by batch.len() - 1

@gregcusack
Copy link
Author

it doesn't reduce the number of bytes sent out, which makes sense since we're still capping the sent bytes by the outbound_budget. So we end up just sending fewer packets but bigger packets. which is generally more efficient for sending/receiving.

oh the goal here was only to reduce network ops? i thought we were only going to sign the batches once to save 96 * (batch.len() - 1) bytes and reduce signing/sigverify by batch.len() - 1

Unfortunately, yes. The node sending the PullResponse doesn't sign any of the CrdsValues. Each CrdsValue is signed by whatever node created the CrdsValue. The node sending the PullResponse is essentially just forwarding the already signed CrdsValues.

Currently the node sending the PullResponse sends one PullResponse per CrdsValue it wants to send to its peers. But the PullResponse can typically hold more than one CrdsValue. So now we're just packing more CrdsValues per PullResponse. So while the number of bytes over the network hasn't decreased the actual "goodput" or useful data we're sending has increased. This means that technically, we could reduce the outbound byte budget. Since now more of the data we're sending is useful data, not just packet headers.

return packet_batch;
}
let scores: Vec<u64> = batches.iter().map(|(_, score)| *score).collect();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can avoid this collect if we first rework WeightedShuffle to something like

impl<T> WeightedShuffle<T>
where
    T: Copy + ConstZero,
{
    fn new_by_key<F, W>(name: &'static str, items: &[T], weightf: F) -> Self
    where
        F: Fn(&T) -> &W,
        W: PartialOrd + AddAssign + CheckedAdd,
    {
         // modify current `fn new()` body to call `weightf()` on each item
    }
    
    fn new(name: &'static str, items: &[T]) -> Self {
        // this probably needs some fuckery to satisfy generics
        Self::new_by_key(name, items, |i| i)
    }
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please lets not change weighted-shuffle in this same PR.
weighted-shuffle is in the hot path of turbine and very performance sensitive, and we need to test and scrutinize any changes to it in isolation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. if we change WeightedShuffle as suggested here, it should be done in another PR in advance of these changes

let crds_values = &pull_responses[*batch_index];
let addr = &addrs[*batch_index];
for chunk_refs in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) {
let chunk: Vec<CrdsValue> = chunk_refs.into_iter().cloned().collect();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this collect can go away if we make the iter() on ln1775 an into_iter()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change this line:

for (batch_index, crds_values) in pull_responses.iter().enumerate() {

to use into_iter(), then we are consuming pull_responses and then wouldn't be able to index into pull_responses later here:

let crds_values = &pull_responses[*batch_index];

right?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WeightedShuffle change is a prereq. then we're iterating directly over the pre-shuffled pull responses and don't need to do the indexing at all

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh ok got it

Comment on lines 1848 to 1852
"handle_pull_requests: {} sent pull responses: {} total crds values: {} sent_crds_values: {} total_bytes: {}",
time,
sent,
responses.len(),
sent_pull_responses,
total_crds_values,
sent_crds_values,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args

Though this debug! seems left over from early days of the code, and does not seem very useful.
If you think removing it will make the code simpler, feel free to remove it entirely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -108,6 +108,7 @@ pub struct GossipStats {
pub(crate) gossip_pull_request_dropped_requests: Counter,
pub(crate) gossip_pull_request_no_budget: Counter,
pub(crate) gossip_pull_request_sent_requests: Counter,
pub(crate) gossip_pull_request_sent_bytes: Counter,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep these sorted.
It will be more readable and easier to skim through.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

continue;
}
// batch-level score. max value of batch
let batch_score = crds_values
Copy link

@behzadnouri behzadnouri Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the maximum of score of all crds values returned to some address?

You might have hundreds of crds values sent to some address, and if you just take their max score for all those values, then you might send a lot of PullResponses where all their inner crds values have very low priority.

I think you would need to first split crds values into chunks which fit into a PullResponse, then take the max score of each of those chunks. ie. one score associated with each PullResponse (or Vec which fit in a PullResponse). Not a single score associated with an addr.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. updated

Comment on lines 1784 to 1793
let mut score = DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
if stakes.contains_key(&value.pubkey()) {
score *= 2;
}
if let CrdsData::ContactInfo(_) = value.data() {
score *= 2;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code was intentionally avoiding mut.
Maybe instead

--- a/gossip/src/cluster_info.rs
+++ b/gossip/src/cluster_info.rs
@@ -1777,16 +1777,11 @@ impl ClusterInfo {
                 let score = DEFAULT_EPOCH_DURATION_MS
                     .saturating_sub(age)
                     .div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
-                    .max(1);
-                let score = if stakes.contains_key(&response.pubkey()) {
-                    2 * score
-                } else {
-                    score
-                };
-                let score = match response.data() {
-                    CrdsData::ContactInfo(_) => 2 * score,
-                    _ => score,
-                };
+                    .max(1)
+                    .saturating_mul(1 + u64::from(stakes.contains_key(&response.pubkey())))
+                    .saturating_mul(
+                        1 + u64::from(matches!(response.data(), CrdsData::ContactInfo(_))),
+                    );
                 ((addr, response), score)
             })
             .unzip();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated and added comment for clarity

let score = if stakes.contains_key(&response.pubkey()) {
2 * score
} else {
let mut total_crds_values = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe num_crds_values instead of total_....

@gregcusack gregcusack force-pushed the chunk-pullresponse-crdsvalues branch from 5c515f5 to b820391 Compare February 4, 2025 05:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants