diff --git a/codec/src/codec.rs b/codec/src/codec.rs index 1c678993bb..31c31adfaf 100644 --- a/codec/src/codec.rs +++ b/codec/src/codec.rs @@ -1,7 +1,7 @@ //! Core traits for encoding and decoding. use crate::error::Error; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; /// Trait for types with a known, fixed encoded size. /// @@ -63,6 +63,20 @@ pub trait Read: Sized { /// This trait provides the convenience [Encode::encode] method which handles /// buffer allocation, writing, and size assertion in one go. pub trait Encode: Write + EncodeSize { + /// Encodes `self` into a new [Bytes] buffer. + /// + /// This method calculates the required size using [EncodeSize::encode_size], allocates a + /// buffer of that exact capacity, writes the value using [Write::write], and performs a + /// sanity check assertion. + /// + /// # Panics + /// + /// Panics if `encode_size()` does not return the same number of bytes actually written by + /// `write()` + fn encode(&self) -> Bytes { + self.encode_mut().freeze() + } + /// Encodes `self` into a new [BytesMut] buffer. /// /// This method calculates the required size using [EncodeSize::encode_size], allocates a @@ -73,7 +87,7 @@ pub trait Encode: Write + EncodeSize { /// /// Panics if `encode_size()` does not return the same number of bytes actually written by /// `write()` - fn encode(&self) -> BytesMut { + fn encode_mut(&self) -> BytesMut { let len = self.encode_size(); let mut buffer = BytesMut::with_capacity(len); self.write(&mut buffer); diff --git a/codec/src/types/hash_map.rs b/codec/src/types/hash_map.rs index 569c5d5f96..4f0224485a 100644 --- a/codec/src/types/hash_map.rs +++ b/codec/src/types/hash_map.rs @@ -302,7 +302,7 @@ mod tests { let mut map = HashMap::new(); map.insert(1u32, 100u64); - let mut encoded = map.encode(); + let mut encoded = map.encode_mut(); encoded.put_u8(0xFF); // Add extra byte // Use decode_cfg which enforces buffer is fully consumed diff --git a/codec/src/types/hash_set.rs b/codec/src/types/hash_set.rs index a0ff22c7f5..8cb3f9a0e5 100644 --- a/codec/src/types/hash_set.rs +++ b/codec/src/types/hash_set.rs @@ -253,7 +253,7 @@ mod tests { let mut set = HashSet::new(); set.insert(1u32); - let mut encoded = set.encode(); + let mut encoded = set.encode_mut(); encoded.put_u8(0xFF); // Add extra byte // Use decode_cfg which enforces buffer is fully consumed diff --git a/collector/fuzz/fuzz_targets/collector.rs b/collector/fuzz/fuzz_targets/collector.rs index d3b3afccd6..528f199081 100644 --- a/collector/fuzz/fuzz_targets/collector.rs +++ b/collector/fuzz/fuzz_targets/collector.rs @@ -231,7 +231,7 @@ impl CheckedSender for MockCheckedSender { async fn send( self, - _message: bytes::Bytes, + _message: impl bytes::Buf + Send, _priority: bool, ) -> Result, Self::Error> { Ok(vec![]) diff --git a/collector/src/p2p/mocks/sender.rs b/collector/src/p2p/mocks/sender.rs index 28b1bfdd24..672cdb2b8c 100644 --- a/collector/src/p2p/mocks/sender.rs +++ b/collector/src/p2p/mocks/sender.rs @@ -1,6 +1,6 @@ //! Mock sender implementations for testing. -use bytes::Bytes; +use bytes::Buf; use commonware_cryptography::PublicKey; use commonware_p2p::{CheckedSender, LimitedSender, Recipients}; use std::time::SystemTime; @@ -47,7 +47,7 @@ impl CheckedSender for CheckedFailing

{ type PublicKey = P; type Error = Error; - async fn send(self, _message: Bytes, _priority: bool) -> Result, Self::Error> { + async fn send(self, _message: impl Buf + Send, _priority: bool) -> Result, Self::Error> { Err(Error::Failed) } } diff --git a/collector/src/p2p/mod.rs b/collector/src/p2p/mod.rs index f4b9a0fcc8..e4e04ae594 100644 --- a/collector/src/p2p/mod.rs +++ b/collector/src/p2p/mod.rs @@ -828,7 +828,7 @@ mod tests { .0 .send( Recipients::One(peers[0].clone()), - response_to_peer1.encode().into(), + response_to_peer1.encode(), true, ) .await diff --git a/consensus/fuzz/src/disrupter.rs b/consensus/fuzz/src/disrupter.rs index 21e9d3ba04..22b4a9a015 100644 --- a/consensus/fuzz/src/disrupter.rs +++ b/consensus/fuzz/src/disrupter.rs @@ -216,11 +216,11 @@ where Vote::Notarize(notarize) => { if self.fuzz_input.random_bool() { let mutated = self.mutate_bytes(&msg); - let _ = sender.send(Recipients::All, mutated.into(), true).await; + let _ = sender.send(Recipients::All, &mutated[..], true).await; } else { let proposal = self.mutate_proposal(¬arize.proposal); if let Some(v) = Notarize::sign(&self.scheme, proposal) { - let msg = Vote::::Notarize(v).encode().into(); + let msg = Vote::::Notarize(v).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } @@ -228,11 +228,11 @@ where Vote::Finalize(finalize) => { if self.fuzz_input.random_bool() { let mutated = self.mutate_bytes(&msg); - let _ = sender.send(Recipients::All, mutated.into(), true).await; + let _ = sender.send(Recipients::All, &mutated[..], true).await; } else { let proposal = self.mutate_proposal(&finalize.proposal); if let Some(v) = Finalize::sign(&self.scheme, proposal) { - let msg = Vote::::Finalize(v).encode().into(); + let msg = Vote::::Finalize(v).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } @@ -240,12 +240,12 @@ where Vote::Nullify(_) => { if self.fuzz_input.random_bool() { let mutated = self.mutate_bytes(&msg); - let _ = sender.send(Recipients::All, mutated.into(), true).await; + let _ = sender.send(Recipients::All, &mutated[..], true).await; } else { let v = self.random_view(self.last_vote); let round = Round::new(Epoch::new(EPOCH), View::new(v)); if let Some(v) = Nullify::::sign::(&self.scheme, round) { - let msg = Vote::::Nullify(v).encode().into(); + let msg = Vote::::Nullify(v).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } @@ -280,7 +280,7 @@ where // Optionally send mutated certificate if self.fuzz_input.random_bool() { let mutated = self.mutate_bytes(&msg); - let _ = sender.send(Recipients::All, mutated.into(), true).await; + let _ = sender.send(Recipients::All, &mutated[..], true).await; } } @@ -327,20 +327,20 @@ where if self.participants.index(&self.validator).is_none() { let bytes = self.bytes(); - let _ = sender.send(Recipients::All, bytes.into(), true).await; + let _ = sender.send(Recipients::All, &bytes[..], true).await; return; } match self.message() { Message::Notarize => { if let Some(vote) = Notarize::sign(&self.scheme, proposal) { - let msg = Vote::::Notarize(vote).encode().into(); + let msg = Vote::::Notarize(vote).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } Message::Finalize => { if let Some(vote) = Finalize::sign(&self.scheme, proposal) { - let msg = Vote::::Finalize(vote).encode().into(); + let msg = Vote::::Finalize(vote).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } @@ -350,13 +350,13 @@ where View::new(self.random_view(self.last_vote)), ); if let Some(vote) = Nullify::::sign::(&self.scheme, round) { - let msg = Vote::::Nullify(vote).encode().into(); + let msg = Vote::::Nullify(vote).encode(); let _ = sender.send(Recipients::All, msg, true).await; } } Message::Random => { let bytes = self.bytes(); - let _ = sender.send(Recipients::All, bytes.into(), true).await; + let _ = sender.send(Recipients::All, &bytes[..], true).await; } } } diff --git a/consensus/src/aggregation/types.rs b/consensus/src/aggregation/types.rs index 26f917e9fd..3fd5bb7931 100644 --- a/consensus/src/aggregation/types.rs +++ b/consensus/src/aggregation/types.rs @@ -148,7 +148,7 @@ impl Subject for &Item { } fn message(&self) -> Bytes { - self.encode().freeze() + self.encode() } } diff --git a/consensus/src/marshal/actor.rs b/consensus/src/marshal/actor.rs index 7bb1be6e1e..0a8d917555 100644 --- a/consensus/src/marshal/actor.rs +++ b/consensus/src/marshal/actor.rs @@ -553,7 +553,7 @@ where debug!(?commitment, "block missing on request"); continue; }; - let _ = response.send(block.encode().into()); + let _ = response.send(block.encode()); } Request::Finalized { height } => { // Get finalization @@ -569,7 +569,7 @@ where }; // Send finalization - let _ = response.send((finalization, block).encode().into()); + let _ = response.send((finalization, block).encode()); } Request::Notarized { round } => { // Get notarization @@ -584,7 +584,7 @@ where debug!(?commitment, "block missing on request"); continue; }; - let _ = response.send((notarization, block).encode().into()); + let _ = response.send((notarization, block).encode()); } } }, diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index d20672daa6..3c11c7f73c 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -838,7 +838,7 @@ impl< node_sender .send( Recipients::Some(validators.iter().cloned().collect()), - node.encode().into(), + node.encode(), self.priority_proposals, ) .await diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index f18147d5da..8cc7a8e1aa 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -206,7 +206,7 @@ mod tests { injector_sender .send( Recipients::One(me.clone()), - Certificate::Notarization(notarization.clone()).encode().into(), + Certificate::Notarization(notarization.clone()).encode(), true, ) .await @@ -224,8 +224,7 @@ mod tests { .send( Recipients::One(me.clone()), Certificate::::Nullification(nullification.clone()) - .encode() - .into(), + .encode(), true, ) .await @@ -242,7 +241,7 @@ mod tests { injector_sender .send( Recipients::One(me.clone()), - Certificate::Finalization(finalization.clone()).encode().into(), + Certificate::Finalization(finalization.clone()).encode(), true, ) .await @@ -372,7 +371,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote).encode().into(), + Vote::Notarize(vote).encode(), true, ) .await @@ -527,7 +526,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote).encode().into(), + Vote::Notarize(vote).encode(), true, ) .await @@ -550,7 +549,7 @@ mod tests { injector_sender .send( Recipients::One(me.clone()), - Certificate::Notarization(notarization.clone()).encode().into(), + Certificate::Notarization(notarization.clone()).encode(), true, ) .await @@ -572,7 +571,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(last_vote).encode().into(), + Vote::Notarize(last_vote).encode(), true, ) .await @@ -706,7 +705,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(leader_vote).encode().into(), + Vote::Notarize(leader_vote).encode(), true, ) .await @@ -731,7 +730,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote).encode().into(), + Vote::Notarize(vote).encode(), true, ) .await @@ -770,7 +769,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote6).encode().into(), + Vote::Notarize(vote6).encode(), true, ) .await @@ -902,7 +901,7 @@ mod tests { leader_sender .send( Recipients::One(me.clone()), - Vote::Notarize(leader_vote).encode().into(), + Vote::Notarize(leader_vote).encode(), true, ) .await @@ -1019,7 +1018,7 @@ mod tests { leader_sender .send( Recipients::One(me.clone()), - Vote::Notarize(leader_vote).encode().into(), + Vote::Notarize(leader_vote).encode(), true, ) .await @@ -1162,7 +1161,7 @@ mod tests { leader_sender .send( Recipients::One(me.clone()), - Vote::Notarize(leader_vote).encode().into(), + Vote::Notarize(leader_vote).encode(), true, ) .await @@ -1318,7 +1317,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote).encode().into(), + Vote::Notarize(vote).encode(), true, ) .await @@ -1364,7 +1363,7 @@ mod tests { sender .send( Recipients::One(me.clone()), - Vote::Notarize(vote).encode().into(), + Vote::Notarize(vote).encode(), true, ) .await diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index 24eed2d85d..9b7dc867d1 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -260,7 +260,7 @@ impl< // the full timeout) return; }; - let _ = response.send(certificate.encode().into()); + let _ = response.send(certificate.encode()); } } } diff --git a/consensus/src/simplex/actors/voter/mod.rs b/consensus/src/simplex/actors/voter/mod.rs index 517bdfaf06..e145a3022c 100644 --- a/consensus/src/simplex/actors/voter/mod.rs +++ b/consensus/src/simplex/actors/voter/mod.rs @@ -2199,7 +2199,7 @@ mod tests { ); let contents = (proposal.round, parent_payload, 0u64).encode(); relay - .broadcast(&leader, (proposal.payload, contents.into())) + .broadcast(&leader, (proposal.payload, contents)) .await; mailbox.proposal(proposal).await; @@ -2391,7 +2391,7 @@ mod tests { // Broadcast payload and send proposal let contents = (proposal3.round, proposal2.payload, 0u64).encode(); - relay.broadcast(&me, (digest3, contents.into())).await; + relay.broadcast(&me, (digest3, contents)).await; mailbox.proposal(proposal3.clone()).await; // Send notarization @@ -2708,7 +2708,7 @@ mod tests { let leader = participants[1].clone(); let contents = (proposal.round, parent_payload, 0u64).encode(); relay - .broadcast(&leader, (proposal.payload, contents.into())) + .broadcast(&leader, (proposal.payload, contents)) .await; mailbox.proposal(proposal.clone()).await; diff --git a/consensus/src/simplex/mocks/application.rs b/consensus/src/simplex/mocks/application.rs index 6698b3e499..7a6bd7b925 100644 --- a/consensus/src/simplex/mocks/application.rs +++ b/consensus/src/simplex/mocks/application.rs @@ -254,7 +254,7 @@ impl Application self.verified.insert(digest); // Store pending payload - self.pending.insert(digest, payload.into()); + self.pending.insert(digest, payload); digest } diff --git a/consensus/src/simplex/mocks/conflicter.rs b/consensus/src/simplex/mocks/conflicter.rs index 4e7a28330c..98c79da687 100644 --- a/consensus/src/simplex/mocks/conflicter.rs +++ b/consensus/src/simplex/mocks/conflicter.rs @@ -61,12 +61,12 @@ where let proposal = Proposal::new(notarize.round(), notarize.proposal.parent, payload); let n = Notarize::::sign(&self.scheme, proposal).unwrap(); - let msg = Vote::Notarize(n).encode().into(); + let msg = Vote::Notarize(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); // Notarize received digest let n = Notarize::::sign(&self.scheme, notarize.proposal).unwrap(); - let msg = Vote::Notarize(n).encode().into(); + let msg = Vote::Notarize(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } Vote::Finalize(finalize) => { @@ -75,12 +75,12 @@ where let proposal = Proposal::new(finalize.round(), finalize.proposal.parent, payload); let f = Finalize::::sign(&self.scheme, proposal).unwrap(); - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); // Finalize provided digest let f = Finalize::::sign(&self.scheme, finalize.proposal).unwrap(); - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } _ => continue, diff --git a/consensus/src/simplex/mocks/equivocator.rs b/consensus/src/simplex/mocks/equivocator.rs index 08076d2e6e..f7f1bdfd61 100644 --- a/consensus/src/simplex/mocks/equivocator.rs +++ b/consensus/src/simplex/mocks/equivocator.rs @@ -140,15 +140,15 @@ impl, L: ElectorConfig, H: Has // Broadcast payloads via relay so nodes can verify let me = &self.scheme.participants()[self.scheme.me().unwrap() as usize]; - self.relay.broadcast(me, (digest_a, payload_a.into())).await; - self.relay.broadcast(me, (digest_b, payload_b.into())).await; + self.relay.broadcast(me, (digest_a, payload_a)).await; + self.relay.broadcast(me, (digest_b, payload_b)).await; // Notarize proposal A and send it to victim only let notarize_a = Notarize::::sign(&self.scheme, proposal_a).expect("sign failed"); vote_sender .send( Recipients::One(victim.clone()), - Vote::Notarize(notarize_a).encode().into(), + Vote::Notarize(notarize_a).encode(), true, ) .await @@ -167,7 +167,7 @@ impl, L: ElectorConfig, H: Has vote_sender .send( Recipients::Some(non_victims), - Vote::Notarize(notarize_b).encode().into(), + Vote::Notarize(notarize_b).encode(), true, ) .await diff --git a/consensus/src/simplex/mocks/impersonator.rs b/consensus/src/simplex/mocks/impersonator.rs index a438de5a31..4100a28bbd 100644 --- a/consensus/src/simplex/mocks/impersonator.rs +++ b/consensus/src/simplex/mocks/impersonator.rs @@ -65,7 +65,7 @@ impl, H: Hasher } // Send invalid message - let msg = Vote::Notarize(n).encode().into(); + let msg = Vote::Notarize(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } Vote::Finalize(finalize) => { @@ -80,7 +80,7 @@ impl, H: Hasher } // Send invalid message - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } _ => continue, diff --git a/consensus/src/simplex/mocks/nuller.rs b/consensus/src/simplex/mocks/nuller.rs index d5f9aa1cab..b4a8acddcc 100644 --- a/consensus/src/simplex/mocks/nuller.rs +++ b/consensus/src/simplex/mocks/nuller.rs @@ -56,13 +56,13 @@ where Vote::Notarize(notarize) => { // Nullify let n = Nullify::sign(&self.scheme, notarize.round()).unwrap(); - let msg = Vote::::Nullify(n).encode().into(); + let msg = Vote::::Nullify(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); // Finalize digest let proposal = notarize.proposal; let f = Finalize::::sign(&self.scheme, proposal).unwrap(); - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } _ => continue, diff --git a/consensus/src/simplex/mocks/nullify_only.rs b/consensus/src/simplex/mocks/nullify_only.rs index 8fdab3cb82..1040138dc2 100644 --- a/consensus/src/simplex/mocks/nullify_only.rs +++ b/consensus/src/simplex/mocks/nullify_only.rs @@ -53,7 +53,7 @@ impl, H: Hasher> NullifyOnly { // Respond with only a `Nullify` vote when a proposal is observed. if let Vote::Notarize(notarize) = msg { let nullify = Nullify::sign::(&self.scheme, notarize.round()).unwrap(); - let msg = Vote::::Nullify(nullify).encode().into(); + let msg = Vote::::Nullify(nullify).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } } diff --git a/consensus/src/simplex/mocks/outdated.rs b/consensus/src/simplex/mocks/outdated.rs index c3a9f3b6e1..5c6deef270 100644 --- a/consensus/src/simplex/mocks/outdated.rs +++ b/consensus/src/simplex/mocks/outdated.rs @@ -79,7 +79,7 @@ where }; debug!(%view, "notarizing old proposal"); let n = Notarize::::sign(&self.scheme, proposal.clone()).unwrap(); - let msg = Vote::Notarize(n).encode().into(); + let msg = Vote::Notarize(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } Vote::Finalize(finalize) => { @@ -93,7 +93,7 @@ where }; debug!(%view, "finalizing old proposal"); let f = Finalize::::sign(&self.scheme, proposal.clone()).unwrap(); - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } _ => continue, diff --git a/consensus/src/simplex/mocks/reconfigurer.rs b/consensus/src/simplex/mocks/reconfigurer.rs index ab947a367e..d94a4d4f69 100644 --- a/consensus/src/simplex/mocks/reconfigurer.rs +++ b/consensus/src/simplex/mocks/reconfigurer.rs @@ -64,7 +64,7 @@ where // Sign and broadcast let n = Notarize::sign(&self.scheme, proposal).unwrap(); - let msg = Vote::Notarize(n).encode().into(); + let msg = Vote::Notarize(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } Vote::Finalize(finalize) => { @@ -76,7 +76,7 @@ where // Sign and broadcast let f = Finalize::sign(&self.scheme, proposal).unwrap(); - let msg = Vote::Finalize(f).encode().into(); + let msg = Vote::Finalize(f).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } Vote::Nullify(nullify) => { @@ -86,7 +86,7 @@ where let new_round = (new_epoch, old_round.view()).into(); let n = Nullify::sign(&self.scheme, new_round).unwrap(); - let msg = Vote::::Nullify(n).encode().into(); + let msg = Vote::::Nullify(n).encode(); sender.send(Recipients::All, msg, true).await.unwrap(); } } diff --git a/consensus/src/simplex/mod.rs b/consensus/src/simplex/mod.rs index a8104c34be..be2cad6b7d 100644 --- a/consensus/src/simplex/mod.rs +++ b/consensus/src/simplex/mod.rs @@ -4615,8 +4615,8 @@ mod tests { for (i, participant) in participants.iter().enumerate() { let recipient = Recipients::One(participant.clone()); let msg = match get_type(i) { - ParticipantType::Group2 => notarization_msg.encode().into(), - _ => nullification_msg.encode().into(), + ParticipantType::Group2 => notarization_msg.encode(), + _ => nullification_msg.encode(), }; injector_sender.send(recipient, msg, true).await.unwrap(); } @@ -4626,22 +4626,18 @@ mod tests { for (i, participant) in participants.iter().enumerate() { let recipient = Recipients::One(participant.clone()); let msg = match get_type(i) { - ParticipantType::Group1 => notarization_msg.encode().into(), - _ => nullification_msg.encode().into(), + ParticipantType::Group1 => notarization_msg.encode(), + _ => nullification_msg.encode(), }; injector_sender.send(recipient, msg, true).await.unwrap(); } // View F: - let msg = Certificate::<_, D>::Notarization(b0_notarization) - .encode() - .into(); + let msg = Certificate::<_, D>::Notarization(b0_notarization).encode(); injector_sender .send(Recipients::All, msg, true) .await .unwrap(); - let msg = Certificate::<_, D>::Finalization(b0_finalization) - .encode() - .into(); + let msg = Certificate::<_, D>::Finalization(b0_finalization).encode(); injector_sender .send(Recipients::All, msg, true) .await diff --git a/consensus/src/simplex/scheme/bls12381_threshold.rs b/consensus/src/simplex/scheme/bls12381_threshold.rs index ce24881a93..248b2f5fa7 100644 --- a/consensus/src/simplex/scheme/bls12381_threshold.rs +++ b/consensus/src/simplex/scheme/bls12381_threshold.rs @@ -426,10 +426,8 @@ impl Seedable for Finalization(subject: &Subject<'_, D>) -> bytes::Bytes { match subject { - Subject::Notarize { proposal } | Subject::Finalize { proposal } => { - proposal.round.encode().freeze() - } - Subject::Nullify { round } => round.encode().freeze(), + Subject::Notarize { proposal } | Subject::Finalize { proposal } => proposal.round.encode(), + Subject::Nullify { round } => round.encode(), } } @@ -1109,8 +1107,7 @@ mod tests { let certificate = schemes[0].assemble(votes).expect("assemble certificate"); let encoded = certificate.encode(); - let decoded = - Signature::::decode_cfg(encoded.freeze(), &()).expect("decode certificate"); + let decoded = Signature::::decode_cfg(encoded, &()).expect("decode certificate"); assert_eq!(decoded, certificate); } @@ -1375,7 +1372,7 @@ mod tests { let certificate = schemes[0].assemble(votes).expect("assemble certificate"); - let mut encoded = certificate.encode().freeze(); + let mut encoded = certificate.encode(); let truncated = encoded.split_to(encoded.len() - 1); assert!(Signature::::decode_cfg(truncated, &()).is_err()); } diff --git a/consensus/src/simplex/scheme/mod.rs b/consensus/src/simplex/scheme/mod.rs index fdb7225348..d656b686ee 100644 --- a/consensus/src/simplex/scheme/mod.rs +++ b/consensus/src/simplex/scheme/mod.rs @@ -81,9 +81,9 @@ impl<'a, D: Digest> certificate::Subject for Subject<'a, D> { fn message(&self) -> Bytes { match self { - Self::Notarize { proposal } => proposal.encode().freeze(), - Self::Nullify { round } => round.encode().freeze(), - Self::Finalize { proposal } => proposal.encode().freeze(), + Self::Notarize { proposal } => proposal.encode(), + Self::Nullify { round } => round.encode(), + Self::Finalize { proposal } => proposal.encode(), } } } diff --git a/cryptography/src/bls12381/dkg.rs b/cryptography/src/bls12381/dkg.rs index 51081a847a..d9e6ffd80c 100644 --- a/cryptography/src/bls12381/dkg.rs +++ b/cryptography/src/bls12381/dkg.rs @@ -1610,7 +1610,7 @@ mod test_plan { &self, info: &Info, ) -> anyhow::Result<(bool, Transcript)> { - let mut summary_bs = info.summary.encode(); + let mut summary_bs = info.summary.encode_mut(); let modified = apply_mask(&mut summary_bs, &self.info_summary); let summary = Summary::read(&mut summary_bs)?; Ok((modified, Transcript::resume(summary))) @@ -1625,11 +1625,11 @@ mod test_plan { let (mut modified, transcript) = self.transcript_for_round(info)?; let mut transcript = transcript.fork(SIG_ACK); - let mut dealer_bs = dealer.encode(); + let mut dealer_bs = dealer.encode_mut(); modified |= apply_mask(&mut dealer_bs, &self.dealer); transcript.commit(&mut dealer_bs); - let mut pub_msg_bs = pub_msg.encode(); + let mut pub_msg_bs = pub_msg.encode_mut(); modified |= apply_mask(&mut pub_msg_bs, &self.pub_msg); transcript.commit(&mut pub_msg_bs); @@ -1644,7 +1644,7 @@ mod test_plan { let (mut modified, transcript) = self.transcript_for_round(info)?; let mut transcript = transcript.fork(SIG_LOG); - let mut log_bs = log.encode(); + let mut log_bs = log.encode_mut(); modified |= apply_mask(&mut log_bs, &self.log); transcript.commit(&mut log_bs); diff --git a/examples/bridge/src/application/actor.rs b/examples/bridge/src/application/actor.rs index a03cee8854..54c6222d5c 100644 --- a/examples/bridge/src/application/actor.rs +++ b/examples/bridge/src/application/actor.rs @@ -84,7 +84,7 @@ impl Application Application Application Application::Success(success).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } @@ -304,14 +304,14 @@ fn main() { match response { Some(block) => { let msg = Outbound::Block(block).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } } None => { let msg = Outbound::::Success(false).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } @@ -329,7 +329,7 @@ fn main() { .expect("failed to send message"); let success = receiver.await.expect("failed to receive response"); let msg = Outbound::::Success(success).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } @@ -347,14 +347,14 @@ fn main() { match response { Some(data) => { let msg = Outbound::Finalization(data).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } } None => { let msg = Outbound::::Success(false).encode(); - if sender.send(&msg).await.is_err() { + if sender.send(msg).await.is_err() { debug!(?peer, "failed to send message"); return; } diff --git a/examples/chat/src/handler.rs b/examples/chat/src/handler.rs index 58be5e9d88..b56dda6b48 100644 --- a/examples/chat/src/handler.rs +++ b/examples/chat/src/handler.rs @@ -259,7 +259,7 @@ pub async fn run( continue; } let mut successful = sender - .send(Recipients::All, input.clone().into_bytes().into(), false) + .send(Recipients::All, input.as_bytes(), false) .await .expect("failed to send message"); if !successful.is_empty() { diff --git a/examples/flood/src/bin/flood.rs b/examples/flood/src/bin/flood.rs index 10ca388f9d..069e3b27b7 100644 --- a/examples/flood/src/bin/flood.rs +++ b/examples/flood/src/bin/flood.rs @@ -152,7 +152,7 @@ fn main() { rng.fill_bytes(&mut msg); // Send to all peers - if let Err(e) = flood_sender.send(Recipients::All, msg.into(), true).await { + if let Err(e) = flood_sender.send(Recipients::All, &msg[..], true).await { error!(?e, "could not send flood message"); } messages.inc(); diff --git a/examples/reshare/src/dkg/actor.rs b/examples/reshare/src/dkg/actor.rs index 8d4fd3328a..a8f3acb429 100644 --- a/examples/reshare/src/dkg/actor.rs +++ b/examples/reshare/src/dkg/actor.rs @@ -356,7 +356,7 @@ where ) .await; if let Some(ack) = response { - let payload = Message::::Ack(ack).encode().freeze(); + let payload = Message::::Ack(ack).encode(); if let Err(e) = round_sender .send(Recipients::One(sender_pk.clone()), payload, true) .await @@ -574,9 +574,7 @@ where } // Send to remote player - let payload = Message::::Dealer(pub_msg, priv_msg) - .encode() - .freeze(); + let payload = Message::::Dealer(pub_msg, priv_msg).encode(); match sender .send(Recipients::One(player.clone()), payload, true) .await diff --git a/examples/sync/src/bin/server.rs b/examples/sync/src/bin/server.rs index b5482afcda..488c8af309 100644 --- a/examples/sync/src/bin/server.rs +++ b/examples/sync/src/bin/server.rs @@ -346,8 +346,8 @@ where outgoing = response_receiver.next() => { if let Some(response) = outgoing { // We have a response to send to the client. - let response_data = response.encode().to_vec(); - if let Err(err) = send_frame(&mut sink, &response_data, MAX_MESSAGE_SIZE).await { + let response_data = response.encode(); + if let Err(err) = send_frame(&mut sink, response_data, MAX_MESSAGE_SIZE).await { info!(client_addr = %client_addr, ?err, "send failed (client likely disconnected)"); state.error_counter.inc(); return Ok(()); diff --git a/examples/sync/src/net/io.rs b/examples/sync/src/net/io.rs index f881f61499..1d378bcce9 100644 --- a/examples/sync/src/net/io.rs +++ b/examples/sync/src/net/io.rs @@ -45,8 +45,8 @@ async fn run_loop( Some(Request { request, response_tx }) => { let request_id = request.request_id(); pending_requests.insert(request_id, response_tx); - let data = request.encode().to_vec(); - if let Err(e) = send_frame(&mut sink, &data, MAX_MESSAGE_SIZE).await { + let data = request.encode(); + if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await { if let Some(sender) = pending_requests.remove(&request_id) { let _ = sender.send(Err(Error::Network(e))); } diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index ee7a3d7f95..5426a64217 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -329,8 +329,8 @@ mod tests { }; // Wait for some message or drop. - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Additional attempts should be rate limited immediately. @@ -338,8 +338,8 @@ mod tests { let (sink, mut stream) = context.dial(address).await.expect("dial"); // Wait for some message or drop. - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); } @@ -472,8 +472,8 @@ mod tests { }; // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Check metrics - should be blocked because it's a private IP diff --git a/p2p/src/authenticated/discovery/actors/peer/actor.rs b/p2p/src/authenticated/discovery/actors/peer/actor.rs index 5082c4a316..8ecf6a0b66 100644 --- a/p2p/src/authenticated/discovery/actors/peer/actor.rs +++ b/p2p/src/authenticated/discovery/actors/peer/actor.rs @@ -90,7 +90,7 @@ impl Actor { payload: types::Payload, ) -> Result<(), Error> { let msg = payload.encode(); - sender.send(&msg).await.map_err(Error::SendFailed)?; + sender.send(msg).await.map_err(Error::SendFailed)?; sent_messages.get_or_create(&metric).inc(); Ok(()) } @@ -482,7 +482,7 @@ mod tests { bits: BitMap::ones(10), }); local_sender - .send(&bit_vec.encode()) + .send(bit_vec.encode()) .await .expect("send failed"); @@ -578,14 +578,14 @@ mod tests { // Send first greeting (valid) let first_greeting = types::Payload::::Greeting(greeting.clone()); local_sender - .send(&first_greeting.encode()) + .send(first_greeting.encode()) .await .expect("send failed"); // Send second greeting (should cause error) let second_greeting = types::Payload::::Greeting(greeting.clone()); local_sender - .send(&second_greeting.encode()) + .send(second_greeting.encode()) .await .expect("send failed"); @@ -690,7 +690,7 @@ mod tests { wrong_greeting.public_key = wrong_pk; let greeting_payload = types::Payload::::Greeting(wrong_greeting); local_sender - .send(&greeting_payload.encode()) + .send(greeting_payload.encode()) .await .expect("send failed"); diff --git a/p2p/src/authenticated/discovery/actors/router/ingress.rs b/p2p/src/authenticated/discovery/actors/router/ingress.rs index ee055de0b0..d22beadb16 100644 --- a/p2p/src/authenticated/discovery/actors/router/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/router/ingress.rs @@ -3,7 +3,7 @@ use crate::{ utils::limited::Connected, Channel, Recipients, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use commonware_cryptography::PublicKey; use commonware_utils::channels::ring; use futures::channel::oneshot; @@ -74,7 +74,7 @@ impl Messenger

{ &mut self, recipients: Recipients

, channel: Channel, - message: Bytes, + mut message: impl Buf + Send, priority: bool, ) -> Vec

{ let (sender, receiver) = oneshot::channel(); @@ -82,7 +82,7 @@ impl Messenger

{ .send(Message::Content { recipients, channel, - message, + message: message.copy_to_bytes(message.remaining()), priority, success: sender, }) diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index 01e61adb83..6a638b2f92 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -348,7 +348,7 @@ mod tests { let mut signature = signer.sign(ip_namespace, &(ingress.clone(), timestamp).encode()); if make_sig_invalid && !signature.as_ref().is_empty() { - let mut sig_bytes = signature.encode(); + let mut sig_bytes = signature.encode_mut(); sig_bytes[0] = sig_bytes[0].wrapping_add(1); signature = Signature::decode(sig_bytes).unwrap(); } diff --git a/p2p/src/authenticated/discovery/channels.rs b/p2p/src/authenticated/discovery/channels.rs index 2e7198f594..e9f1f34ad1 100644 --- a/p2p/src/authenticated/discovery/channels.rs +++ b/p2p/src/authenticated/discovery/channels.rs @@ -3,7 +3,7 @@ use crate::{ utils::limited::{CheckedSender, LimitedSender}, Channel, Message, Recipients, }; -use bytes::Bytes; +use bytes::Buf; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, Quota}; use futures::{channel::mpsc, StreamExt}; @@ -26,11 +26,11 @@ impl crate::UnlimitedSender for UnlimitedSender

{ async fn send( &mut self, recipients: Recipients, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { - if message.len() > self.max_size as usize { - return Err(Error::MessageTooLarge(message.len())); + if message.remaining() > self.max_size as usize { + return Err(Error::MessageTooLarge(message.remaining())); } Ok(self diff --git a/p2p/src/authenticated/discovery/mod.rs b/p2p/src/authenticated/discovery/mod.rs index 98f8f0adc9..d34c81df78 100644 --- a/p2p/src/authenticated/discovery/mod.rs +++ b/p2p/src/authenticated/discovery/mod.rs @@ -369,7 +369,7 @@ mod tests { let sent = sender .send( Recipients::One(recipient.clone()), - msg.to_vec().into(), + msg.as_ref(), true, ) .await @@ -394,7 +394,7 @@ mod tests { let mut sent = sender .send( Recipients::Some(recipients.clone()), - msg.to_vec().into(), + msg.as_ref(), true, ) .await @@ -419,7 +419,7 @@ mod tests { // Loop until all peer sends successful loop { let mut sent = sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -609,7 +609,7 @@ mod tests { let msg = signer.public_key(); loop { if sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap() .len() @@ -684,7 +684,7 @@ mod tests { // Send message let recipient = Recipients::One(addresses[1].clone()); - let result = sender.send(recipient, msg.into(), true).await; + let result = sender.send(recipient, &msg[..], true).await; assert!(matches!(result, Err(Error::MessageTooLarge(_)))); }); } @@ -744,11 +744,7 @@ mod tests { loop { // Confirm message is sent to peer let sent = sender0 - .send( - Recipients::One(addresses[1].clone()), - msg.clone().into(), - true, - ) + .send(Recipients::One(addresses[1].clone()), &msg[..], true) .await .unwrap(); if !sent.is_empty() { @@ -764,7 +760,7 @@ mod tests { // With partial sends, rate-limited recipients return empty vec (not error). // Outbound rate limiting skips the peer, returns empty vec. let sent = sender0 - .send(Recipients::One(addresses[1].clone()), msg.into(), true) + .send(Recipients::One(addresses[1].clone()), &msg[..], true) .await .unwrap(); assert!(sent.is_empty()); @@ -903,7 +899,7 @@ mod tests { let msg = signer.public_key(); loop { let sent = sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap(); if sent.len() >= expected_connections { @@ -1203,7 +1199,7 @@ mod tests { loop { let mut sent = sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -1288,11 +1284,7 @@ mod tests { // Verify peer 0 cannot send to peer 1 yet let sent = sender0 - .send( - Recipients::One(peer1.public_key()), - b"test".to_vec().into(), - true, - ) + .send(Recipients::One(peer1.public_key()), b"test".as_ref(), true) .await .unwrap(); assert!(sent.is_empty(), "should not be connected yet"); @@ -1332,11 +1324,11 @@ mod tests { move |context| async move { loop { let sent0 = sender0 - .send(Recipients::One(pk1.clone()), pk0.to_vec().into(), true) + .send(Recipients::One(pk1.clone()), pk0.as_ref(), true) .await .unwrap(); let sent1 = sender1 - .send(Recipients::One(pk0.clone()), pk1.to_vec().into(), true) + .send(Recipients::One(pk0.clone()), pk1.as_ref(), true) .await .unwrap(); if !sent0.is_empty() && !sent1.is_empty() { @@ -1434,7 +1426,7 @@ mod tests { loop { let mut sent = sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -1526,7 +1518,7 @@ mod tests { // Try to send from peer 1 - should not reach anyone since private IPs are blocked let sent = sender1 - .send(Recipients::All, peer1.public_key().to_vec().into(), true) + .send(Recipients::All, peer1.public_key().as_ref(), true) .await .unwrap(); assert!( @@ -1625,7 +1617,7 @@ mod tests { let sent = sender1 .send( Recipients::One(pk0.clone()), - peer1.public_key().to_vec().into(), + peer1.public_key().as_ref(), true, ) .await @@ -1707,7 +1699,7 @@ mod tests { let sent = sender .send( Recipients::All, - peers[i].public_key().to_vec().into(), + peers[i].public_key().as_ref(), true, ) .await @@ -1756,7 +1748,7 @@ mod tests { let sent = peer0_sender .send( Recipients::One(addresses[restart_peer_idx].clone()), - addresses[0].to_vec().into(), + addresses[0].as_ref(), true, ) .await @@ -1800,7 +1792,7 @@ mod tests { let sent = restarted_sender .send( Recipients::All, - peers[restart_peer_idx].public_key().to_vec().into(), + peers[restart_peer_idx].public_key().as_ref(), true, ) .await @@ -1821,7 +1813,7 @@ mod tests { let sent = sender .send( Recipients::One(addresses[restart_peer_idx].clone()), - peers[i].public_key().to_vec().into(), + peers[i].public_key().as_ref(), true, ) .await @@ -1921,7 +1913,7 @@ mod tests { let sender = sender.as_mut().unwrap(); loop { let sent = sender - .send(Recipients::All, peers[i].public_key().to_vec().into(), true) + .send(Recipients::All, peers[i].public_key().as_ref(), true) .await .unwrap(); if sent.len() == n - 1 { @@ -1966,7 +1958,7 @@ mod tests { let sent = peer0_sender .send( Recipients::One(addresses[restart_peer_idx].clone()), - addresses[0].to_vec().into(), + addresses[0].as_ref(), true, ) .await @@ -2016,7 +2008,7 @@ mod tests { let sent = restarted_sender .send( Recipients::All, - peers[restart_peer_idx].public_key().to_vec().into(), + peers[restart_peer_idx].public_key().as_ref(), true, ) .await @@ -2037,7 +2029,7 @@ mod tests { let sent = sender .send( Recipients::One(addresses[restart_peer_idx].clone()), - peers[i].public_key().to_vec().into(), + peers[i].public_key().as_ref(), true, ) .await diff --git a/p2p/src/authenticated/discovery/types.rs b/p2p/src/authenticated/discovery/types.rs index 09f6d7cbcc..47f3438652 100644 --- a/p2p/src/authenticated/discovery/types.rs +++ b/p2p/src/authenticated/discovery/types.rs @@ -384,7 +384,7 @@ impl InfoVerifier { #[cfg(test)] mod tests { use super::*; - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; use commonware_codec::{Decode, DecodeExt}; use commonware_cryptography::secp256r1::standard::{PrivateKey, PublicKey}; use commonware_math::algebra::Random; @@ -469,7 +469,7 @@ mod tests { index: 1234, bits: BitMap::ones(100), }; - let encoded: BytesMut = Payload::::BitVec(original.clone()).encode(); + let encoded: Bytes = Payload::::BitVec(original.clone()).encode(); let decoded = match Payload::::decode_cfg(encoded, &cfg) { Ok(Payload::::BitVec(b)) => b, _ => panic!(), diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index 899d325ef5..b1a739c5fb 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -364,8 +364,8 @@ mod tests { }; // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Additional attempts should be rate limited immediately @@ -373,8 +373,8 @@ mod tests { let (sink, mut stream) = context.dial(address).await.expect("dial"); // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); } @@ -525,8 +525,8 @@ mod tests { }; // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Check metrics @@ -606,8 +606,8 @@ mod tests { }; // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Check metrics @@ -695,8 +695,8 @@ mod tests { }; // Wait for some message or drop - let buf = vec![0u8; 1]; - let _ = stream.recv(buf).await; + let mut buf = [0u8; 1]; + let _ = stream.recv(&mut buf[..]).await; drop((sink, stream)); // Check metrics - should be blocked because it's a private IP diff --git a/p2p/src/authenticated/lookup/actors/peer/actor.rs b/p2p/src/authenticated/lookup/actors/peer/actor.rs index 5e01816843..12849a5af8 100644 --- a/p2p/src/authenticated/lookup/actors/peer/actor.rs +++ b/p2p/src/authenticated/lookup/actors/peer/actor.rs @@ -78,7 +78,7 @@ impl Actor { payload: types::Message, ) -> Result<(), Error> { let msg = payload.encode(); - sender.send(&msg).await.map_err(Error::SendFailed)?; + sender.send(msg).await.map_err(Error::SendFailed)?; sent_messages.get_or_create(&metric).inc(); Ok(()) } diff --git a/p2p/src/authenticated/lookup/actors/router/ingress.rs b/p2p/src/authenticated/lookup/actors/router/ingress.rs index 058b13beea..56b128bf85 100644 --- a/p2p/src/authenticated/lookup/actors/router/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/router/ingress.rs @@ -3,7 +3,7 @@ use crate::{ utils::limited::Connected, Channel, Recipients, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use commonware_cryptography::PublicKey; use commonware_utils::channels::ring; use futures::channel::oneshot; @@ -74,7 +74,7 @@ impl Messenger

{ &mut self, recipients: Recipients

, channel: Channel, - message: Bytes, + mut message: impl Buf + Send, priority: bool, ) -> Vec

{ let (sender, receiver) = oneshot::channel(); @@ -82,7 +82,7 @@ impl Messenger

{ .send(Message::Content { recipients, channel, - message, + message: message.copy_to_bytes(message.remaining()), priority, success: sender, }) diff --git a/p2p/src/authenticated/lookup/channels.rs b/p2p/src/authenticated/lookup/channels.rs index b24929b9d8..fa78e472b5 100644 --- a/p2p/src/authenticated/lookup/channels.rs +++ b/p2p/src/authenticated/lookup/channels.rs @@ -4,7 +4,7 @@ use crate::{ utils::limited::{CheckedSender, LimitedSender}, Channel, Message, Recipients, }; -use bytes::Bytes; +use bytes::Buf; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, Quota}; use futures::{channel::mpsc, StreamExt}; @@ -27,11 +27,11 @@ impl crate::UnlimitedSender for UnlimitedSender

{ async fn send( &mut self, recipients: Recipients, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { - if message.len() > self.max_size as usize { - return Err(Error::MessageTooLarge(message.len())); + if message.remaining() > self.max_size as usize { + return Err(Error::MessageTooLarge(message.remaining())); } Ok(self diff --git a/p2p/src/authenticated/lookup/mod.rs b/p2p/src/authenticated/lookup/mod.rs index 1280f7825d..4ab39e314d 100644 --- a/p2p/src/authenticated/lookup/mod.rs +++ b/p2p/src/authenticated/lookup/mod.rs @@ -314,7 +314,7 @@ mod tests { let sent = sender .send( Recipients::One(pub_key.clone()), - public_key.to_vec().into(), + public_key.as_ref(), true, ) .await @@ -339,7 +339,7 @@ mod tests { let mut sent = sender .send( Recipients::Some(public_keys.clone()), - public_key.to_vec().into(), + public_key.as_ref(), true, ) .await @@ -364,11 +364,7 @@ mod tests { // Loop until all peer sends successful loop { let mut sent = sender - .send( - Recipients::All, - public_key.to_vec().into(), - true, - ) + .send(Recipients::All, public_key.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -545,7 +541,7 @@ mod tests { // Loop until success loop { if sender - .send(Recipients::All, msg.to_vec().into(), true) + .send(Recipients::All, msg.as_ref(), true) .await .unwrap() .len() @@ -627,7 +623,7 @@ mod tests { // Send message let recipient = Recipients::One(peers[1].clone()); - let result = sender.send(recipient, msg.into(), true).await; + let result = sender.send(recipient, &msg[..], true).await; assert!(matches!(result, Err(Error::MessageTooLarge(_)))); }); } @@ -678,7 +674,7 @@ mod tests { loop { // Confirm message is sent to peer let sent = sender0 - .send(Recipients::One(pk1.clone()), msg.clone().into(), true) + .send(Recipients::One(pk1.clone()), &msg[..], true) .await .unwrap(); if !sent.is_empty() { @@ -694,7 +690,7 @@ mod tests { // With partial sends, rate-limited recipients return empty vec (not error). // Outbound rate limiting skips the peer, returns empty vec. let sent = sender0 - .send(Recipients::One(pk1), msg.into(), true) + .send(Recipients::One(pk1), &msg[..], true) .await .unwrap(); assert!(sent.is_empty()); @@ -817,7 +813,7 @@ mod tests { // Send a message loop { let sent = sender - .send(Recipients::All, pk.to_vec().into(), true) + .send(Recipients::All, pk.as_ref(), true) .await .unwrap(); if sent.len() >= expected_connections { @@ -1124,7 +1120,7 @@ mod tests { loop { let mut sent = sender - .send(Recipients::All, pk.to_vec().into(), true) + .send(Recipients::All, pk.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -1254,7 +1250,7 @@ mod tests { loop { let mut sent = sender - .send(Recipients::All, pk.to_vec().into(), true) + .send(Recipients::All, pk.as_ref(), true) .await .unwrap(); if sent.len() != n - 1 { @@ -1349,7 +1345,7 @@ mod tests { // Try to send from peer 1 - should not reach anyone since private IPs are blocked let sent = sender1 - .send(Recipients::All, peer1.public_key().to_vec().into(), true) + .send(Recipients::All, peer1.public_key().as_ref(), true) .await .unwrap(); assert!( @@ -1452,7 +1448,7 @@ mod tests { let sent = sender1 .send( Recipients::One(pk0.clone()), - peer1.public_key().to_vec().into(), + peer1.public_key().as_ref(), true, ) .await diff --git a/p2p/src/authenticated/lookup/types.rs b/p2p/src/authenticated/lookup/types.rs index e11416b178..5d0b6d6ad5 100644 --- a/p2p/src/authenticated/lookup/types.rs +++ b/p2p/src/authenticated/lookup/types.rs @@ -95,7 +95,7 @@ mod tests { channel: 7, message: Bytes::from_static(b"ping"), }); - let encoded = payload.encode().freeze(); + let encoded = payload.encode(); let decoded = Message::decode_cfg(encoded, &4).expect("within limit"); match decoded { @@ -113,7 +113,7 @@ mod tests { channel: 9, message: Bytes::from_static(b"hello"), }); - let encoded = payload.encode().freeze(); + let encoded = payload.encode(); let result = Message::decode_cfg(encoded, &4); assert!(matches!(result, Err(Error::InvalidLength(5)))); diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index d245c45d0e..6fae6902bc 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -10,7 +10,7 @@ html_favicon_url = "https://commonware.xyz/favicon.ico" )] -use bytes::Bytes; +use bytes::{Buf, Bytes}; use commonware_cryptography::PublicKey; use commonware_utils::ordered::Set; use futures::channel::mpsc; @@ -66,7 +66,7 @@ pub trait UnlimitedSender: Clone + Send + Sync + 'static { fn send( &mut self, recipients: Recipients, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> impl Future, Self::Error>> + Send; } @@ -126,7 +126,7 @@ pub trait CheckedSender: Send { /// receive the message. fn send( self, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> impl Future, Self::Error>> + Send; } @@ -157,7 +157,7 @@ pub trait Sender: LimitedSender { fn send( &mut self, recipients: Recipients, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> impl Future< Output = Result, as CheckedSender>::Error>, diff --git a/p2p/src/simulated/mod.rs b/p2p/src/simulated/mod.rs index 5ac616519c..fb65f8e428 100644 --- a/p2p/src/simulated/mod.rs +++ b/p2p/src/simulated/mod.rs @@ -365,7 +365,7 @@ mod tests { let mut msg = vec![0u8; 1024 * 1024 + 1]; context.fill(&mut msg[..]); let result = message_sender - .send(Recipients::All, msg.into(), false) + .send(Recipients::All, &msg[..], false) .await .unwrap_err(); diff --git a/p2p/src/simulated/network.rs b/p2p/src/simulated/network.rs index 9a6e9c8c91..09261f8bcd 100644 --- a/p2p/src/simulated/network.rs +++ b/p2p/src/simulated/network.rs @@ -10,7 +10,7 @@ use crate::{ utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender}, Channel, Message, Recipients, UnlimitedSender as _, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use commonware_codec::{DecodeExt, FixedSize}; use commonware_cryptography::PublicKey; use commonware_macros::{select, select_loop}; @@ -775,17 +775,18 @@ impl crate::UnlimitedSender for UnlimitedSender

{ async fn send( &mut self, recipients: Recipients

, - message: Bytes, + mut message: impl Buf + Send, priority: bool, ) -> Result, Error> { // Check message size - if message.len() > self.max_size as usize { - return Err(Error::MessageTooLarge(message.len())); + if message.remaining() > self.max_size as usize { + return Err(Error::MessageTooLarge(message.remaining())); } // Send message let (sender, receiver) = oneshot::channel(); let channel = if priority { &self.high } else { &self.low }; + let message = message.copy_to_bytes(message.remaining()); channel .unbounded_send((self.channel, self.me.clone(), recipients, message, sender)) .map_err(|_| Error::NetworkClosed)?; @@ -972,9 +973,12 @@ impl<'a, P: PublicKey, E: Clock, F: SplitForwarder

> crate::CheckedSender async fn send( self, - message: Bytes, + mut message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { + // Convert to Bytes here since forwarder needs to inspect the message + let message = message.copy_to_bytes(message.remaining()); + // Determine the set of recipients that will receive the message let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else { return Ok(Vec::new()); @@ -1250,7 +1254,7 @@ impl Link { context.with_label("link").spawn(move |context| async move { // Dial the peer and handshake by sending it the dialer's public key let (mut sink, _) = context.dial(socket).await.unwrap(); - if let Err(err) = send_frame(&mut sink, &dialer, max_size).await { + if let Err(err) = send_frame(&mut sink, dialer.as_ref(), max_size).await { error!(?err, "failed to send public key to listener"); return; } @@ -1261,11 +1265,8 @@ impl Link { context.sleep_until(receive_complete_at).await; // Send the message - let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len()); - data.extend_from_slice(&channel.to_be_bytes()); - data.extend_from_slice(&message); - let data = data.freeze(); - let _ = send_frame(&mut sink, &data, max_size).await; + let data = Bytes::from_owner(channel.to_be_bytes()).chain(message); + let _ = send_frame(&mut sink, data, max_size).await; // Bump received messages metric received_messages diff --git a/p2p/src/utils/codec.rs b/p2p/src/utils/codec.rs index 37d06497cf..38b5661385 100644 --- a/p2p/src/utils/codec.rs +++ b/p2p/src/utils/codec.rs @@ -43,9 +43,7 @@ impl WrappedSender { priority: bool, ) -> Result, as CheckedSender>::Error> { let encoded = message.encode(); - self.sender - .send(recipients, encoded.freeze(), priority) - .await + self.sender.send(recipients, encoded, priority).await } /// Check if a message can be sent to a set of recipients, returning a [CheckedWrappedSender] @@ -77,7 +75,7 @@ impl<'a, S: Sender, V: Codec> CheckedWrappedSender<'a, S, V> { priority: bool, ) -> Result, as CheckedSender>::Error> { let encoded = message.encode(); - self.sender.send(encoded.freeze(), priority).await + self.sender.send(encoded, priority).await } } diff --git a/p2p/src/utils/limited.rs b/p2p/src/utils/limited.rs index 362fd4ec7c..2eb3025fc8 100644 --- a/p2p/src/utils/limited.rs +++ b/p2p/src/utils/limited.rs @@ -1,7 +1,7 @@ //! Rate-limited [`UnlimitedSender`] wrapper. use crate::{Recipients, UnlimitedSender}; -use bytes::Bytes; +use bytes::Buf; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, KeyedRateLimiter, Quota}; use commonware_utils::channels::ring; @@ -203,7 +203,7 @@ impl<'a, S: UnlimitedSender> crate::CheckedSender for CheckedSender<'a, S> { async fn send( self, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { self.sender.send(self.recipients, message, priority).await @@ -251,7 +251,7 @@ mod tests { async fn send( &mut self, recipients: Recipients, - message: Bytes, + mut message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { let sent_to = match &recipients { @@ -259,6 +259,7 @@ mod tests { Recipients::Some(pks) => pks.clone(), Recipients::All => Vec::new(), }; + let message = message.copy_to_bytes(message.remaining()); self.sent.lock().await.push((recipients, message, priority)); Ok(sent_to) } diff --git a/p2p/src/utils/mux.rs b/p2p/src/utils/mux.rs index e5c2ece81e..a4a19d33dc 100644 --- a/p2p/src/utils/mux.rs +++ b/p2p/src/utils/mux.rs @@ -9,8 +9,8 @@ //! even if the muxer is already running. use crate::{Channel, CheckedSender, LimitedSender, Message, Receiver, Recipients, Sender}; -use bytes::{BufMut, Bytes, BytesMut}; -use commonware_codec::{varint::UInt, EncodeSize, Error as CodecError, ReadExt, Write}; +use bytes::{Buf, Bytes}; +use commonware_codec::{varint::UInt, Encode, Error as CodecError, ReadExt}; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, ContextCell, Handle, Spawner}; use futures::{ @@ -301,7 +301,7 @@ impl GlobalSender { &mut self, subchannel: Channel, recipients: Recipients, - payload: Bytes, + payload: impl Buf + Send, priority: bool, ) -> Result, as CheckedSender>::Error> { match self.check(recipients).await { @@ -354,14 +354,13 @@ impl<'a, S: Sender> CheckedSender for CheckedGlobalSender<'a, S> { async fn send( self, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { let subchannel = UInt(self.subchannel.expect("subchannel not set")); - let mut buf = BytesMut::with_capacity(subchannel.encode_size() + message.len()); - subchannel.write(&mut buf); - buf.put_slice(&message); - self.inner.send(buf.freeze(), priority).await + self.inner + .send(subchannel.encode().chain(message), priority) + .await } } @@ -870,12 +869,7 @@ mod tests { assert_eq!(subchannel, 1); assert_eq!(from, pk1); global_sender2 - .send( - subchannel, - Recipients::One(pk1), - b"TEST".to_vec().into(), - true, - ) + .send(subchannel, Recipients::One(pk1), &b"TEST"[..], true) .await .unwrap(); diff --git a/resolver/src/p2p/fetcher.rs b/resolver/src/p2p/fetcher.rs index 3daccb3162..0ded74af27 100644 --- a/resolver/src/p2p/fetcher.rs +++ b/resolver/src/p2p/fetcher.rs @@ -573,7 +573,7 @@ where mod tests { use super::*; use crate::p2p::mocks::Key as MockKey; - use bytes::Bytes; + use bytes::Buf; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, @@ -610,7 +610,7 @@ mod tests { async fn send( self, - message: Bytes, + message: impl Buf + Send, priority: bool, ) -> Result, Self::Error> { self.sender.send(self.recipients, message, priority).await @@ -627,7 +627,7 @@ mod tests { async fn send( &mut self, _recipients: Recipients, - _message: Bytes, + _message: impl Buf + Send, _priority: bool, ) -> Result, Self::Error> { Ok(vec![]) @@ -664,7 +664,7 @@ mod tests { async fn send( &mut self, recipients: Recipients, - _message: Bytes, + _message: impl Buf + Send, _priority: bool, ) -> Result, Self::Error> { match recipients { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 2720ee5455..e500e6fca5 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -22,6 +22,7 @@ html_favicon_url = "https://commonware.xyz/favicon.ico" )] +use bytes::{Buf, BufMut}; use commonware_macros::select; use commonware_utils::StableBuf; use prometheus_client::registry::Metric; @@ -475,10 +476,7 @@ pub trait Sink: Sync + Send + 'static { /// # Warning /// /// If the sink returns an error, part of the message may still be delivered. - fn send( - &mut self, - msg: impl Into + Send, - ) -> impl Future> + Send; + fn send(&mut self, msg: impl Buf + Send) -> impl Future> + Send; } /// Interface that any runtime must implement to receive @@ -490,10 +488,7 @@ pub trait Stream: Sync + Send + 'static { /// # Warning /// /// If the stream returns an error, partially read data may be discarded. - fn recv( - &mut self, - buf: impl Into + Send, - ) -> impl Future> + Send; + fn recv(&mut self, buf: impl BufMut + Send) -> impl Future> + Send; } /// Interface to interact with storage. @@ -2735,7 +2730,8 @@ mod tests { async fn read_line(stream: &mut St) -> Result { let mut line = Vec::new(); loop { - let byte = stream.recv(vec![0; 1]).await?; + let mut byte = [0u8; 1]; + stream.recv(&mut byte[..]).await?; if byte[0] == b'\n' { if line.last() == Some(&b'\r') { line.pop(); // Remove trailing \r @@ -2768,8 +2764,9 @@ mod tests { stream: &mut St, content_length: usize, ) -> Result { - let read = stream.recv(vec![0; content_length]).await?; - String::from_utf8(read.into()).map_err(|_| Error::ReadFailed) + let mut read = vec![0; content_length]; + stream.recv(&mut read[..]).await?; + String::from_utf8(read).map_err(|_| Error::ReadFailed) } // Simulate a client connecting to the server @@ -2791,7 +2788,7 @@ mod tests { let request = format!( "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n" ); - sink.send(Bytes::from(request).to_vec()).await.unwrap(); + sink.send(Bytes::from(request)).await.unwrap(); // Read and verify the HTTP status line let status_line = read_line(&mut stream).await.unwrap(); diff --git a/runtime/src/mocks.rs b/runtime/src/mocks.rs index 05fa0a27ca..be4c48e73a 100644 --- a/runtime/src/mocks.rs +++ b/runtime/src/mocks.rs @@ -1,18 +1,14 @@ //! A mock implementation of a channel that implements the Sink and Stream traits. use crate::{Error, Sink as SinkTrait, Stream as StreamTrait}; -use bytes::Bytes; -use commonware_utils::StableBuf; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::channel::oneshot; -use std::{ - collections::VecDeque, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; /// A mock channel struct that is used internally by Sink and Stream. pub struct Channel { /// Stores the bytes sent by the sink that are not yet read by the stream. - buffer: VecDeque, + buffer: BytesMut, /// If the stream is waiting to read bytes, the waiter stores the number of /// bytes that the stream is waiting for, as well as the oneshot sender that @@ -30,7 +26,7 @@ impl Channel { /// Returns an async-safe Sink/Stream pair that share an underlying buffer of bytes. pub fn init() -> (Sink, Stream) { let channel = Arc::new(Mutex::new(Self { - buffer: VecDeque::new(), + buffer: BytesMut::new(), waiter: None, sink_alive: true, stream_alive: true, @@ -50,8 +46,7 @@ pub struct Sink { } impl SinkTrait for Sink { - async fn send(&mut self, msg: impl Into + Send) -> Result<(), Error> { - let msg = msg.into(); + async fn send(&mut self, buf: impl Buf + Send) -> Result<(), Error> { let (os_send, data) = { let mut channel = self.channel.lock().unwrap(); @@ -60,8 +55,7 @@ impl SinkTrait for Sink { return Err(Error::Closed); } - // Add the data to the buffer. - channel.buffer.extend(msg.as_ref()); + channel.buffer.put(buf); // If there is a waiter and the buffer is large enough, // return the waiter (while clearing the waiter field). @@ -72,8 +66,8 @@ impl SinkTrait for Sink { .is_some_and(|(requested, _)| *requested <= channel.buffer.len()) { let (requested, os_send) = channel.waiter.take().unwrap(); - let data: Vec = channel.buffer.drain(0..requested).collect(); - (os_send, Bytes::from(data)) + let data = channel.buffer.copy_to_bytes(requested); + (os_send, data) } else { return Ok(()); } @@ -101,17 +95,16 @@ pub struct Stream { } impl StreamTrait for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { - let mut buf = buf.into(); + async fn recv(&mut self, mut buf: impl BufMut + Send) -> Result<(), Error> { let os_recv = { let mut channel = self.channel.lock().unwrap(); // If the message is fully available in the buffer, // drain the value into buf and return. - if channel.buffer.len() >= buf.len() { - let b: Vec = channel.buffer.drain(0..buf.len()).collect(); + if channel.buffer.len() >= buf.remaining_mut() { + let b = channel.buffer.copy_to_bytes(buf.remaining_mut()); buf.put_slice(&b); - return Ok(buf); + return Ok(()); } // At this point, there is not enough data in the buffer. @@ -123,16 +116,16 @@ impl StreamTrait for Stream { // Otherwise, populate the waiter. assert!(channel.waiter.is_none()); let (os_send, os_recv) = oneshot::channel(); - channel.waiter = Some((buf.len(), os_send)); + channel.waiter = Some((buf.remaining_mut(), os_send)); os_recv }; // Wait for the waiter to be resolved. // If the oneshot sender was dropped, it means the sink is closed. let data = os_recv.await.map_err(|_| Error::Closed)?; - assert_eq!(data.len(), buf.len()); + assert_eq!(data.len(), buf.remaining_mut()); buf.put_slice(&data); - Ok(buf) + Ok(()) } } @@ -153,32 +146,35 @@ mod tests { #[test] fn test_send_recv() { let (mut sink, mut stream) = Channel::init(); - let data = b"hello world".to_vec(); + let data = b"hello world"; let executor = deterministic::Runner::default(); executor.start(|_| async move { - sink.send(data.clone()).await.unwrap(); - let buf = stream.recv(vec![0; data.len()]).await.unwrap(); - assert_eq!(buf.as_ref(), data); + sink.send(data.as_slice()).await.unwrap(); + let mut buf = vec![0u8; data.len()]; + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], data); }); } #[test] fn test_send_recv_partial_multiple() { let (mut sink, mut stream) = Channel::init(); - let data = b"hello".to_vec(); - let data2 = b" world".to_vec(); + let data = b"hello"; + let data2 = b" world"; let executor = deterministic::Runner::default(); executor.start(|_| async move { - sink.send(data).await.unwrap(); - sink.send(data2).await.unwrap(); - let buf = stream.recv(vec![0; 5]).await.unwrap(); - assert_eq!(buf.as_ref(), b"hello"); - let buf = stream.recv(buf).await.unwrap(); - assert_eq!(buf.as_ref(), b" worl"); - let buf = stream.recv(vec![0; 1]).await.unwrap(); - assert_eq!(buf.as_ref(), b"d"); + sink.send(data.as_slice()).await.unwrap(); + sink.send(data2.as_slice()).await.unwrap(); + let mut buf = [0u8; 5]; + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], b"hello"); + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], b" worl"); + let mut buf = [0u8; 1]; + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], b"d"); }); } @@ -189,12 +185,13 @@ mod tests { let executor = deterministic::Runner::default(); executor.start(|_| async move { - let (buf, _) = futures::try_join!(stream.recv(vec![0; data.len()]), async { + let mut buf = vec![0; data.len()]; + let (_, _) = futures::try_join!(stream.recv(&mut buf[..]), async { sleep(Duration::from_millis(50)); - sink.send(data.to_vec()).await + sink.send(data.as_slice()).await }) .unwrap(); - assert_eq!(buf.as_ref(), data); + assert_eq!(&buf[..], data); }); } @@ -206,7 +203,8 @@ mod tests { executor.start(|context| async move { futures::join!( async { - let result = stream.recv(vec![0; 5]).await; + let mut buf = [0u8; 5]; + let result = stream.recv(&mut buf[..]).await; assert!(matches!(result, Err(Error::Closed))); }, async { @@ -225,7 +223,8 @@ mod tests { let executor = deterministic::Runner::default(); executor.start(|_| async move { - let result = stream.recv(vec![0; 5]).await; + let mut buf = [0u8; 5]; + let result = stream.recv(&mut buf[..]).await; assert!(matches!(result, Err(Error::Closed))); }); } @@ -237,12 +236,13 @@ mod tests { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Send some bytes - assert!(sink.send(b"7 bytes".to_vec()).await.is_ok()); + assert!(sink.send(b"7 bytes".as_slice()).await.is_ok()); // Spawn a task to initiate recv's where the first one will succeed and then will drop. let handle = context.clone().spawn(|_| async move { - let _ = stream.recv(vec![0; 5]).await; - let _ = stream.recv(vec![0; 5]).await; + let mut buf = [0u8; 5]; + let _ = stream.recv(&mut buf[..]).await; + let _ = stream.recv(&mut buf[..]).await; }); // Give the async task a moment to start @@ -253,7 +253,7 @@ mod tests { assert!(matches!(handle.await, Err(Error::Closed))); // Try to send a message. The stream is dropped, so this should fail. - let result = sink.send(b"hello world".to_vec()).await; + let result = sink.send(b"hello world".as_slice()).await; assert!(matches!(result, Err(Error::Closed))); }); } @@ -265,7 +265,7 @@ mod tests { let executor = deterministic::Runner::default(); executor.start(|_| async move { - let result = sink.send(b"hello world".to_vec()).await; + let result = sink.send(b"hello world".as_slice()).await; assert!(matches!(result, Err(Error::Closed))); }); } @@ -278,8 +278,9 @@ mod tests { // The timeout should return first. let executor = deterministic::Runner::default(); executor.start(|context| async move { + let mut buf = [0u8; 5]; select! { - v = stream.recv(vec![0;5]) => { + v = stream.recv(&mut buf[..]) => { panic!("unexpected value: {v:?}"); }, _ = context.sleep(Duration::from_millis(100)) => { diff --git a/runtime/src/network/audited.rs b/runtime/src/network/audited.rs index ae78eee4e8..cad20fb949 100644 --- a/runtime/src/network/audited.rs +++ b/runtime/src/network/audited.rs @@ -1,5 +1,5 @@ use crate::{deterministic::Auditor, Error, SinkOf, StreamOf}; -use commonware_utils::StableBuf; +use bytes::{Buf, BufMut}; use sha2::Digest; use std::{net::SocketAddr, sync::Arc}; @@ -11,14 +11,14 @@ pub struct Sink { } impl crate::Sink for Sink { - async fn send(&mut self, data: impl Into + Send) -> Result<(), Error> { - let data = data.into(); + async fn send(&mut self, mut buf: impl Buf + Send) -> Result<(), Error> { + let bytes = buf.copy_to_bytes(buf.remaining()); self.auditor.event(b"send_attempt", |hasher| { hasher.update(self.remote_addr.to_string().as_bytes()); - hasher.update(data.as_ref()); + hasher.update(&bytes); }); - self.inner.send(data).await.inspect_err(|e| { + self.inner.send(bytes).await.inspect_err(|e| { self.auditor.event(b"send_failure", |hasher| { hasher.update(self.remote_addr.to_string().as_bytes()); hasher.update(e.to_string().as_bytes()); @@ -40,12 +40,16 @@ pub struct Stream { } impl crate::Stream for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { + async fn recv(&mut self, mut buf: impl BufMut + Send) -> Result<(), Error> { + // Create an intermediate buffer to capture data for auditing + let len = buf.remaining_mut(); + let mut temp = vec![0u8; len]; + self.auditor.event(b"recv_attempt", |hasher| { hasher.update(self.remote_addr.to_string().as_bytes()); }); - let buf = self.inner.recv(buf).await.inspect_err(|e| { + self.inner.recv(&mut temp[..]).await.inspect_err(|e| { self.auditor.event(b"recv_failure", |hasher| { hasher.update(self.remote_addr.to_string().as_bytes()); hasher.update(e.to_string().as_bytes()); @@ -54,9 +58,11 @@ impl crate::Stream for Stream { self.auditor.event(b"recv_success", |hasher| { hasher.update(self.remote_addr.to_string().as_bytes()); - hasher.update(buf.as_ref()); + hasher.update(&temp); }); - Ok(buf) + + buf.put_slice(&temp); + Ok(()) } } @@ -257,11 +263,12 @@ mod tests { let (_, mut sink, mut stream) = listener.accept().await.unwrap(); // Receive data from client - let buf = stream.recv(vec![0; CLIENT_MSG.len()]).await.unwrap(); - assert_eq!(buf.as_ref(), CLIENT_MSG.as_bytes()); + let mut buf = vec![0; CLIENT_MSG.len()]; + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], CLIENT_MSG.as_bytes()); // Send response - sink.send(Vec::from(SERVER_MSG)).await.unwrap(); + sink.send(SERVER_MSG.as_bytes()).await.unwrap(); }); server_handles.push(handle); } @@ -275,11 +282,12 @@ mod tests { let (mut sink, mut stream) = network.dial(listener_addr).await.unwrap(); // Send data to server - sink.send(Vec::from(CLIENT_MSG)).await.unwrap(); + sink.send(CLIENT_MSG.as_bytes()).await.unwrap(); // Receive response - let buf = stream.recv(vec![0; SERVER_MSG.len()]).await.unwrap(); - assert_eq!(buf.as_ref(), SERVER_MSG.as_bytes()); + let mut buf = vec![0; SERVER_MSG.len()]; + stream.recv(&mut buf[..]).await.unwrap(); + assert_eq!(&buf[..], SERVER_MSG.as_bytes()); }); client_handles.push(handle); } diff --git a/runtime/src/network/deterministic.rs b/runtime/src/network/deterministic.rs index 60634c297a..5651884fca 100644 --- a/runtime/src/network/deterministic.rs +++ b/runtime/src/network/deterministic.rs @@ -1,4 +1,5 @@ -use crate::{mocks, Error, StableBuf}; +use crate::{mocks, Error}; +use bytes::{Buf, BufMut}; use futures::{channel::mpsc, SinkExt as _, StreamExt as _}; use std::{ collections::HashMap, @@ -16,7 +17,7 @@ pub struct Sink { } impl crate::Sink for Sink { - async fn send(&mut self, msg: impl Into + Send) -> Result<(), Error> { + async fn send(&mut self, msg: impl Buf + Send) -> Result<(), Error> { self.sender.send(msg).await.map_err(|_| Error::SendFailed) } } @@ -27,7 +28,7 @@ pub struct Stream { } impl crate::Stream for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { + async fn recv(&mut self, buf: impl BufMut + Send) -> Result<(), Error> { self.receiver.recv(buf).await.map_err(|_| Error::RecvFailed) } } diff --git a/runtime/src/network/iouring.rs b/runtime/src/network/iouring.rs index bdd0c88f2f..617b5bad5c 100644 --- a/runtime/src/network/iouring.rs +++ b/runtime/src/network/iouring.rs @@ -23,6 +23,7 @@ //! This implementation is only available on Linux systems that support io_uring. use crate::iouring::{self, should_retry}; +use bytes::{Buf, BufMut, BytesMut}; use commonware_utils::StableBuf; use futures::{ channel::{mpsc, oneshot}, @@ -240,8 +241,12 @@ impl Sink { } impl crate::Sink for Sink { - async fn send(&mut self, msg: impl Into + Send) -> Result<(), crate::Error> { - let mut msg = msg.into(); + async fn send(&mut self, mut msg: impl Buf + Send) -> Result<(), crate::Error> { + // TODO(#2705): Use writev to avoid this copy. + let mut msg: StableBuf = { + let buf = msg.copy_to_bytes(msg.remaining()); + BytesMut::from(buf).into() + }; let mut bytes_sent = 0; let msg_len = msg.len(); @@ -404,17 +409,17 @@ impl Stream { } impl crate::Stream for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { - let mut buf = buf.into(); + async fn recv(&mut self, mut buf: impl BufMut + Send) -> Result<(), crate::Error> { + let mut owned_buf: StableBuf = BytesMut::zeroed(buf.remaining_mut()).into(); let mut bytes_received = 0; - let buf_len = buf.len(); + let buf_len = owned_buf.len(); while bytes_received < buf_len { // First drain any buffered data let buffered = self.buffer_len - self.buffer_pos; if buffered > 0 { let to_copy = std::cmp::min(buffered, buf_len - bytes_received); - buf.as_mut()[bytes_received..bytes_received + to_copy] + owned_buf.as_mut()[bytes_received..bytes_received + to_copy] .copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + to_copy]); self.buffer_pos += to_copy; bytes_received += to_copy; @@ -427,8 +432,9 @@ impl crate::Stream for Stream { // to fill the buffer and immediately drain it let buffer_len = self.buffer.len(); if buffer_len == 0 || remaining >= buffer_len { - let (returned_buf, result) = self.submit_recv(buf, bytes_received, remaining).await; - buf = returned_buf; + let (returned_buf, result) = + self.submit_recv(owned_buf, bytes_received, remaining).await; + owned_buf = returned_buf; bytes_received += result?; } else { // Fill internal buffer, then loop will copy @@ -436,7 +442,8 @@ impl crate::Stream for Stream { } } - Ok(buf) + buf.put_slice(owned_buf.as_ref()); + Ok(()) } } @@ -516,20 +523,21 @@ mod tests { let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); // Read a small message (much smaller than the 64KB buffer) - let buf = stream.recv(vec![0u8; 10]).await.unwrap(); + let mut buf = [0u8; 10]; + stream.recv(&mut buf[..]).await.unwrap(); buf }); // Connect and send a small message let (mut sink, _stream) = network.dial(addr).await.unwrap(); let msg = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - sink.send(msg.clone()).await.unwrap(); + sink.send(msg.as_ref()).await.unwrap(); // Wait for the reader to complete let received = reader.await.unwrap(); // Verify we got the right data - assert_eq!(received.as_ref(), &msg[..]); + assert_eq!(received.as_slice(), &msg[..]); } #[tokio::test] @@ -561,7 +569,8 @@ mod tests { // Try to read 100 bytes, but only 5 will be sent let start = Instant::now(); - let result = stream.recv(vec![0u8; 100]).await; + let mut buf = [0u8; 100]; + let result = stream.recv(&mut buf[..]).await; let elapsed = start.elapsed(); (result, elapsed) @@ -569,7 +578,7 @@ mod tests { // Connect and send only partial data let (mut sink, _stream) = network.dial(addr).await.unwrap(); - sink.send(vec![1u8, 2, 3, 4, 5]).await.unwrap(); + sink.send([1u8, 2, 3, 4, 5].as_slice()).await.unwrap(); // Wait for the reader to complete let (result, elapsed) = reader.await.unwrap(); @@ -608,21 +617,23 @@ mod tests { let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); // Read messages without buffering - let buf1 = stream.recv(vec![0u8; 5]).await.unwrap(); - let buf2 = stream.recv(vec![0u8; 5]).await.unwrap(); + let mut buf1 = [0u8; 5]; + let mut buf2 = [0u8; 5]; + stream.recv(&mut buf1[..]).await.unwrap(); + stream.recv(&mut buf2[..]).await.unwrap(); (buf1, buf2) }); // Connect and send two messages let (mut sink, _stream) = network.dial(addr).await.unwrap(); - sink.send(vec![1u8, 2, 3, 4, 5]).await.unwrap(); - sink.send(vec![6u8, 7, 8, 9, 10]).await.unwrap(); + sink.send([1u8, 2, 3, 4, 5].as_slice()).await.unwrap(); + sink.send([6u8, 7, 8, 9, 10].as_slice()).await.unwrap(); // Wait for the reader to complete let (buf1, buf2) = reader.await.unwrap(); // Verify we got the right data - assert_eq!(buf1.as_ref(), &[1u8, 2, 3, 4, 5]); - assert_eq!(buf2.as_ref(), &[6u8, 7, 8, 9, 10]); + assert_eq!(buf1.as_slice(), &[1u8, 2, 3, 4, 5]); + assert_eq!(buf2.as_slice(), &[6u8, 7, 8, 9, 10]); } } diff --git a/runtime/src/network/metered.rs b/runtime/src/network/metered.rs index 4e17117f40..56e6291667 100644 --- a/runtime/src/network/metered.rs +++ b/runtime/src/network/metered.rs @@ -1,5 +1,5 @@ use crate::{SinkOf, StreamOf}; -use commonware_utils::StableBuf; +use bytes::{Buf, BufMut}; use prometheus_client::{metrics::counter::Counter, registry::Registry}; use std::{net::SocketAddr, sync::Arc}; @@ -55,9 +55,8 @@ pub struct Sink { } impl crate::Sink for Sink { - async fn send(&mut self, data: impl Into + Send) -> Result<(), crate::Error> { - let data = data.into(); - let len = data.len(); + async fn send(&mut self, data: impl Buf + Send) -> Result<(), crate::Error> { + let len = data.remaining(); self.inner.send(data).await?; self.metrics.outbound_bandwidth.inc_by(len as u64); Ok(()) @@ -71,10 +70,11 @@ pub struct Stream { } impl crate::Stream for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { - let buf = self.inner.recv(buf).await?; - self.metrics.inbound_bandwidth.inc_by(buf.len() as u64); - Ok(buf) + async fn recv(&mut self, buf: impl BufMut + Send) -> Result<(), crate::Error> { + let size = buf.remaining_mut(); + self.inner.recv(buf).await?; + self.metrics.inbound_bandwidth.inc_by(size as u64); + Ok(()) } } @@ -217,8 +217,9 @@ mod tests { // Create a server task that accepts one connection and echoes data let server = tokio::spawn(async move { let (_, mut sink, mut stream) = listener.accept().await.unwrap(); - let buf = stream.recv(vec![0; MSG_SIZE as usize]).await.unwrap(); - sink.send(buf).await.unwrap(); + let mut buf = vec![0; MSG_SIZE as usize]; + stream.recv(&mut buf[..]).await.unwrap(); + sink.send(&buf[..]).await.unwrap(); }); // Send and receive data as client @@ -226,14 +227,12 @@ mod tests { // Send fixed-size data and receive response let msg = vec![42u8; MSG_SIZE as usize]; - client_sink.send(msg.clone()).await.unwrap(); + client_sink.send(msg.as_slice()).await.unwrap(); - let response = client_stream - .recv(vec![0; MSG_SIZE as usize]) - .await - .unwrap(); + let mut response = vec![0u8; MSG_SIZE as usize]; + client_stream.recv(&mut response[..]).await.unwrap(); assert_eq!(response.len(), MSG_SIZE as usize); - assert_eq!(response.as_ref(), msg); + assert_eq!(&response[..], &msg[..]); // Wait for server to complete server.await.unwrap(); diff --git a/runtime/src/network/mod.rs b/runtime/src/network/mod.rs index b20a1a8d50..7a24184c1c 100644 --- a/runtime/src/network/mod.rs +++ b/runtime/src/network/mod.rs @@ -45,12 +45,10 @@ mod tests { let server = runtime.spawn(async move { let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept"); - let read = stream - .recv(vec![0; CLIENT_SEND_DATA.len()]) - .await - .expect("Failed to receive"); - assert_eq!(read.as_ref(), CLIENT_SEND_DATA.as_bytes()); - sink.send(Vec::from(SERVER_SEND_DATA)) + let mut read = vec![0; CLIENT_SEND_DATA.len()]; + stream.recv(&mut read[..]).await.expect("Failed to receive"); + assert_eq!(&read[..], CLIENT_SEND_DATA.as_bytes()); + sink.send(SERVER_SEND_DATA.as_bytes()) .await .expect("Failed to send"); }); @@ -63,15 +61,16 @@ mod tests { .await .expect("Failed to dial server"); - sink.send(Vec::from(CLIENT_SEND_DATA)) + sink.send(CLIENT_SEND_DATA.as_bytes()) .await .expect("Failed to send data"); - let read = stream - .recv(vec![0; SERVER_SEND_DATA.len()]) + let mut read = vec![0; SERVER_SEND_DATA.len()]; + stream + .recv(&mut read[..]) .await .expect("Failed to receive data"); - assert_eq!(read.as_ref(), SERVER_SEND_DATA.as_bytes()); + assert_eq!(&read[..], SERVER_SEND_DATA.as_bytes()); }); // Wait for both tasks to complete @@ -97,13 +96,11 @@ mod tests { for _ in 0..3 { let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept"); - let read = stream - .recv(vec![0; CLIENT_SEND_DATA.len()]) - .await - .expect("Failed to receive"); - assert_eq!(read.as_ref(), CLIENT_SEND_DATA.as_bytes()); + let mut read = vec![0; CLIENT_SEND_DATA.len()]; + stream.recv(&mut read[..]).await.expect("Failed to receive"); + assert_eq!(&read[..], CLIENT_SEND_DATA.as_bytes()); - sink.send(Vec::from(SERVER_SEND_DATA)) + sink.send(SERVER_SEND_DATA.as_bytes()) .await .expect("Failed to send"); } @@ -119,17 +116,18 @@ mod tests { .expect("Failed to dial server"); // Send a message to the server - sink.send(Vec::from(CLIENT_SEND_DATA)) + sink.send(CLIENT_SEND_DATA.as_bytes()) .await .expect("Failed to send data"); // Receive a message from the server - let read = stream - .recv(vec![0; SERVER_SEND_DATA.len()]) + let mut read = vec![0; SERVER_SEND_DATA.len()]; + stream + .recv(&mut read[..]) .await .expect("Failed to receive data"); // Verify the received data - assert_eq!(read.as_ref(), SERVER_SEND_DATA.as_bytes()); + assert_eq!(&read[..], SERVER_SEND_DATA.as_bytes()); } }); @@ -156,11 +154,12 @@ mod tests { // Receive and echo large data in chunks for _ in 0..NUM_CHUNKS { - let read = stream - .recv(vec![0; CHUNK_SIZE]) + let mut read = vec![0; CHUNK_SIZE]; + stream + .recv(&mut read[..]) .await .expect("Failed to receive chunk"); - sink.send(read).await.expect("Failed to send chunk"); + sink.send(&read[..]).await.expect("Failed to send chunk"); } }); @@ -177,14 +176,13 @@ mod tests { // Send and verify data in chunks for _ in 0..NUM_CHUNKS { - sink.send(pattern.clone()) - .await - .expect("Failed to send chunk"); - let read = stream - .recv(vec![0; CHUNK_SIZE]) + sink.send(&pattern[..]).await.expect("Failed to send chunk"); + let mut read = vec![0; CHUNK_SIZE]; + stream + .recv(&mut read[..]) .await .expect("Failed to receive chunk"); - assert_eq!(read.as_ref(), pattern); + assert_eq!(&read[..], &pattern[..]); } }); @@ -240,8 +238,9 @@ mod tests { let (_, mut sink, mut stream) = listener.accept().await.unwrap(); tokio::spawn(async move { for _ in 0..NUM_MESSAGES { - let data = stream.recv(vec![0; MESSAGE_SIZE]).await.unwrap(); - sink.send(data).await.unwrap(); + let mut data = vec![0; MESSAGE_SIZE]; + stream.recv(&mut data[..]).await.unwrap(); + sink.send(&data[..]).await.unwrap(); } }); } @@ -255,9 +254,10 @@ mod tests { let (mut sink, mut stream) = network.dial(addr).await.unwrap(); let payload = vec![42u8; MESSAGE_SIZE]; for _ in 0..NUM_MESSAGES { - sink.send(payload.clone()).await.unwrap(); - let echo = stream.recv(vec![0; MESSAGE_SIZE]).await.unwrap(); - assert_eq!(echo.as_ref(), payload); + sink.send(&payload[..]).await.unwrap(); + let mut echo = vec![0; MESSAGE_SIZE]; + stream.recv(&mut echo[..]).await.unwrap(); + assert_eq!(&echo[..], &payload[..]); } })); } diff --git a/runtime/src/network/tokio.rs b/runtime/src/network/tokio.rs index 990b59e5e4..b14eb580d5 100644 --- a/runtime/src/network/tokio.rs +++ b/runtime/src/network/tokio.rs @@ -1,5 +1,5 @@ use crate::Error; -use commonware_utils::StableBuf; +use bytes::{Buf, BufMut}; use std::{net::SocketAddr, time::Duration}; use tokio::{ io::{AsyncReadExt as _, AsyncWriteExt as _, BufReader}, @@ -18,9 +18,9 @@ pub struct Sink { } impl crate::Sink for Sink { - async fn send(&mut self, msg: impl Into + Send) -> Result<(), Error> { + async fn send(&mut self, mut msg: impl Buf + Send) -> Result<(), Error> { // Time out if we take too long to write - timeout(self.write_timeout, self.sink.write_all(msg.into().as_ref())) + timeout(self.write_timeout, self.sink.write_all_buf(&mut msg)) .await .map_err(|_| Error::Timeout)? .map_err(|_| Error::SendFailed)?; @@ -38,19 +38,30 @@ pub struct Stream { } impl crate::Stream for Stream { - async fn recv(&mut self, buf: impl Into + Send) -> Result { - let mut buf = buf.into(); - if buf.is_empty() { - return Ok(buf); - } + async fn recv(&mut self, mut buf: impl BufMut + Send) -> Result<(), Error> { + let read_fut = async { + let mut read = 0; + let len = buf.remaining_mut(); + while read < len { + let n = self + .stream + .read_buf(&mut buf) + .await + .map_err(|_| Error::RecvFailed)?; + + if n == 0 { + return Err(Error::RecvFailed); + } + + read += n; + } + Ok(()) + }; // Time out if we take too long to read - timeout(self.read_timeout, self.stream.read_exact(buf.as_mut())) + timeout(self.read_timeout, read_fut) .await .map_err(|_| Error::Timeout)? - .map_err(|_| Error::RecvFailed)?; - - Ok(buf) } } @@ -290,7 +301,8 @@ mod tests { // Read a small message (much smaller than the 64KB buffer) let start = Instant::now(); - let buf = stream.recv(vec![0u8; 10]).await.unwrap(); + let mut buf = vec![0u8; 10]; + stream.recv(&mut buf[..]).await.unwrap(); let elapsed = start.elapsed(); (buf, elapsed) @@ -299,13 +311,13 @@ mod tests { // Connect and send a small message let (mut sink, _stream) = network.dial(addr).await.unwrap(); let msg = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - sink.send(msg.clone()).await.unwrap(); + sink.send(msg.as_slice()).await.unwrap(); // Wait for the reader to complete let (received, elapsed) = reader.await.unwrap(); // Verify we got the right data - assert_eq!(received.as_ref(), &msg[..]); + assert_eq!(received, &msg[..]); // Verify it completed quickly (well under the read timeout) // Should complete in milliseconds, not seconds @@ -331,7 +343,8 @@ mod tests { // Try to read 100 bytes, but only 5 will be sent let start = Instant::now(); - let result = stream.recv(vec![0u8; 100]).await; + let mut buf = [0u8; 100]; + let result = stream.recv(&mut buf[..]).await; let elapsed = start.elapsed(); (result, elapsed) @@ -339,7 +352,7 @@ mod tests { // Connect and send only partial data let (mut sink, _stream) = network.dial(addr).await.unwrap(); - sink.send(vec![1u8, 2, 3, 4, 5]).await.unwrap(); + sink.send([1u8, 2, 3, 4, 5].as_slice()).await.unwrap(); // Wait for the reader to complete let (result, elapsed) = reader.await.unwrap(); @@ -370,21 +383,23 @@ mod tests { let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); // Read messages without buffering - let buf1 = stream.recv(vec![0u8; 5]).await.unwrap(); - let buf2 = stream.recv(vec![0u8; 5]).await.unwrap(); + let mut buf1 = vec![0u8; 5]; + let mut buf2 = vec![0u8; 5]; + stream.recv(&mut buf1[..]).await.unwrap(); + stream.recv(&mut buf2[..]).await.unwrap(); (buf1, buf2) }); // Connect and send two messages let (mut sink, _stream) = network.dial(addr).await.unwrap(); - sink.send(vec![1u8, 2, 3, 4, 5]).await.unwrap(); - sink.send(vec![6u8, 7, 8, 9, 10]).await.unwrap(); + sink.send([1u8, 2, 3, 4, 5].as_slice()).await.unwrap(); + sink.send([6u8, 7, 8, 9, 10].as_slice()).await.unwrap(); // Wait for the reader to complete let (buf1, buf2) = reader.await.unwrap(); // Verify we got the right data - assert_eq!(buf1.as_ref(), &[1u8, 2, 3, 4, 5]); - assert_eq!(buf2.as_ref(), &[6u8, 7, 8, 9, 10]); + assert_eq!(buf1.as_slice(), &[1u8, 2, 3, 4, 5]); + assert_eq!(buf2.as_slice(), &[6u8, 7, 8, 9, 10]); } } diff --git a/storage/src/bmt/mod.rs b/storage/src/bmt/mod.rs index 1513d78fc9..dc322e68cf 100644 --- a/storage/src/bmt/mod.rs +++ b/storage/src/bmt/mod.rs @@ -1045,7 +1045,7 @@ mod tests { // Generate a valid proof for leaf at index 1. let proof = tree.proof(1).unwrap(); - let mut serialized = proof.encode(); + let mut serialized = proof.encode_mut(); // Append an extra byte. serialized.extend_from_slice(&[0u8]); diff --git a/storage/src/freezer/storage.rs b/storage/src/freezer/storage.rs index 61a47e451b..51a0fd74a8 100644 --- a/storage/src/freezer/storage.rs +++ b/storage/src/freezer/storage.rs @@ -525,7 +525,7 @@ impl Freezer { // Write the new entry table - .write_at(update.encode(), table_offset + start) + .write_at(update.encode_mut(), table_offset + start) .await .map_err(Error::Runtime) } diff --git a/storage/src/metadata/storage.rs b/storage/src/metadata/storage.rs index 08f7f64ed9..d4cee20853 100644 --- a/storage/src/metadata/storage.rs +++ b/storage/src/metadata/storage.rs @@ -346,7 +346,7 @@ impl Metadata { let new_value = self.map.get(key).expect("key must exist"); if info.length == new_value.encode_size() { // Overwrite existing value - let encoded = new_value.encode(); + let encoded = new_value.encode_mut(); target.data[info.start..info.start + info.length].copy_from_slice(&encoded); writes.push(target.blob.write_at(encoded, info.start as u64)); } else { diff --git a/storage/src/mmr/proof.rs b/storage/src/mmr/proof.rs index 8ccb87ed3d..0481c6e112 100644 --- a/storage/src/mmr/proof.rs +++ b/storage/src/mmr/proof.rs @@ -1083,7 +1083,7 @@ mod tests { let proof = mmr.range_proof(range).unwrap(); let expected_size = proof.encode_size(); - let serialized_proof = proof.encode().freeze(); + let serialized_proof = proof.encode(); assert_eq!( serialized_proof.len(), expected_size, @@ -1098,7 +1098,7 @@ mod tests { // Remove one byte from the end of the serialized // proof and confirm it fails to deserialize. - let serialized_proof = proof.encode().freeze(); + let serialized_proof = proof.encode(); let serialized_proof: Bytes = serialized_proof.slice(0..serialized_proof.len() - 1); assert!( Proof::::decode_cfg(serialized_proof, &max_digests).is_err(), @@ -1107,9 +1107,9 @@ mod tests { // Add 1 byte of extra data to the end of the serialized // proof and confirm it fails to deserialize. - let mut serialized_proof = proof.encode(); + let mut serialized_proof = proof.encode_mut(); serialized_proof.extend_from_slice(&[0; 10]); - let serialized_proof = serialized_proof.freeze(); + let serialized_proof = serialized_proof; assert!( Proof::::decode_cfg(serialized_proof, &max_digests).is_err(), @@ -1118,7 +1118,7 @@ mod tests { // Confirm deserialization fails when max length is exceeded. if max_digests > 0 { - let serialized_proof = proof.encode().freeze(); + let serialized_proof = proof.encode(); assert!( Proof::::decode_cfg(serialized_proof, &(max_digests - 1)).is_err(), "proof should not deserialize with max length exceeded" diff --git a/storage/src/ordinal/storage.rs b/storage/src/ordinal/storage.rs index e58d3c621e..68d62d0539 100644 --- a/storage/src/ordinal/storage.rs +++ b/storage/src/ordinal/storage.rs @@ -267,7 +267,7 @@ impl> Ordinal { let blob = self.blobs.get(§ion).unwrap(); let offset = (index % items_per_blob) * Record::::SIZE as u64; let record = Record::new(value); - blob.write_at(record.encode(), offset).await?; + blob.write_at(record.encode_mut(), offset).await?; self.pending.insert(section); // Add to intervals diff --git a/storage/src/qmdb/any/operation/mod.rs b/storage/src/qmdb/any/operation/mod.rs index f1c03a8237..bc9981a445 100644 --- a/storage/src/qmdb/any/operation/mod.rs +++ b/storage/src/qmdb/any/operation/mod.rs @@ -125,11 +125,11 @@ mod tests { where T: Codec + PartialEq + std::fmt::Debug, { - let encoded = value.encode().freeze(); + let encoded = value.encode(); let decoded = T::decode_cfg(encoded.clone(), cfg).expect("decode"); assert_eq!(decoded, *value); let encoded2 = decoded.encode(); - assert_eq!(encoded, encoded2.freeze()); + assert_eq!(encoded, encoded2); } #[test] diff --git a/storage/src/qmdb/any/operation/update/mod.rs b/storage/src/qmdb/any/operation/update/mod.rs index 67975d8861..9666bb472d 100644 --- a/storage/src/qmdb/any/operation/update/mod.rs +++ b/storage/src/qmdb/any/operation/update/mod.rs @@ -36,11 +36,11 @@ mod tests { where T: Codec + PartialEq + fmt::Debug, { - let encoded = value.encode().freeze(); + let encoded = value.encode(); let decoded = T::decode_cfg(encoded.clone(), cfg).expect("decode"); assert_eq!(decoded, *value); let encoded2 = decoded.encode(); - assert_eq!(encoded, encoded2.freeze()); + assert_eq!(encoded, encoded2); } #[test] diff --git a/stream/fuzz/fuzz_targets/connection.rs b/stream/fuzz/fuzz_targets/connection.rs index c0ed37fa03..b50fd65bc7 100644 --- a/stream/fuzz/fuzz_targets/connection.rs +++ b/stream/fuzz/fuzz_targets/connection.rs @@ -143,7 +143,7 @@ fn fuzz(input: FuzzInput) { continue; } - dialer_sender.send(msg).await.unwrap(); + dialer_sender.send(msg.as_slice()).await.unwrap(); let received = listener_receiver.recv().await.unwrap(); assert_eq!(&received[..], &msg[..], "Message {i} mismatch"); } @@ -154,7 +154,7 @@ fn fuzz(input: FuzzInput) { continue; } - listener_sender.send(msg).await.unwrap(); + listener_sender.send(msg.as_slice()).await.unwrap(); let received = dialer_receiver.recv().await.unwrap(); assert_eq!(&received[..], &msg[..], "Message {i} mismatch"); } diff --git a/stream/fuzz/fuzz_targets/e2e.rs b/stream/fuzz/fuzz_targets/e2e.rs index 14ea8252dd..aa166e3fdb 100644 --- a/stream/fuzz/fuzz_targets/e2e.rs +++ b/stream/fuzz/fuzz_targets/e2e.rs @@ -136,7 +136,7 @@ fn fuzz(input: FuzzInput) { let mut corruption_i = 0; let announce = recv_frame(&mut adversary_d_stream, MAX_MESSAGE_SIZE).await?; - send_frame(&mut adversary_d_sink, &announce, MAX_MESSAGE_SIZE).await?; + send_frame(&mut adversary_d_sink, announce, MAX_MESSAGE_SIZE).await?; let mut m1 = recv_frame(&mut adversary_d_stream, MAX_MESSAGE_SIZE) .await? @@ -147,7 +147,7 @@ fn fuzz(input: FuzzInput) { corruption_i += 1; } } - send_frame(&mut adversary_d_sink, &m1, MAX_MESSAGE_SIZE).await?; + send_frame(&mut adversary_d_sink, m1.as_slice(), MAX_MESSAGE_SIZE).await?; let mut m2 = recv_frame(&mut adversary_l_stream, MAX_MESSAGE_SIZE) .await? @@ -158,7 +158,7 @@ fn fuzz(input: FuzzInput) { corruption_i += 1; } } - send_frame(&mut adversary_l_sink, &m2, MAX_MESSAGE_SIZE).await?; + send_frame(&mut adversary_l_sink, m2.as_slice(), MAX_MESSAGE_SIZE).await?; let mut m3 = recv_frame(&mut adversary_d_stream, MAX_MESSAGE_SIZE) .await? @@ -171,7 +171,7 @@ fn fuzz(input: FuzzInput) { } let sent_corrupted_data = setup_corruption.iter().take(corruption_i).any(|x| *x != 0); - send_frame(&mut adversary_d_sink, &m3, MAX_MESSAGE_SIZE).await?; + send_frame(&mut adversary_d_sink, m3.as_slice(), MAX_MESSAGE_SIZE).await?; Ok(( sent_corrupted_data, adversary_d_stream, @@ -236,9 +236,9 @@ fn fuzz(input: FuzzInput) { &mut d_receiver, ), }; - sender.send(&data).await.unwrap(); + sender.send(data.as_slice()).await.unwrap(); let frame = recv_frame(a_in, MAX_MESSAGE_SIZE).await.unwrap(); - send_frame(a_out, &frame, MAX_MESSAGE_SIZE).await.unwrap(); + send_frame(a_out, frame, MAX_MESSAGE_SIZE).await.unwrap(); let data2 = receiver.recv().await.unwrap(); assert_eq!(data, data2, "expected data to match"); } @@ -262,9 +262,11 @@ fn fuzz(input: FuzzInput) { &mut d_receiver, ), }; - sender.send(&[]).await.unwrap(); + sender.send([].as_slice()).await.unwrap(); let _ = recv_frame(a_in, MAX_MESSAGE_SIZE).await.unwrap(); - send_frame(a_out, &data, MAX_MESSAGE_SIZE).await.unwrap(); + send_frame(a_out, data.as_slice(), MAX_MESSAGE_SIZE) + .await + .unwrap(); let res = receiver.recv().await; assert!(res.is_err()); } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 0830d5687d..3dd84e8e06 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -63,7 +63,7 @@ pub mod utils; use crate::utils::codec::{recv_frame, send_frame}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use commonware_codec::{DecodeExt, Encode as _, Error as CodecError}; use commonware_cryptography::{ handshake::{ @@ -197,13 +197,13 @@ pub async fn dial( peer, ), ); - send_frame(&mut sink, &syn.encode(), config.max_message_size).await?; + send_frame(&mut sink, syn.encode(), config.max_message_size).await?; let syn_ack_bytes = recv_frame(&mut stream, config.max_message_size).await?; let syn_ack = SynAck::::decode(syn_ack_bytes)?; let (ack, send, recv) = dial_end(state, syn_ack)?; - send_frame(&mut sink, &ack.encode(), config.max_message_size).await?; + send_frame(&mut sink, ack.encode(), config.max_message_size).await?; Ok(( Sender { @@ -264,7 +264,7 @@ pub async fn listen< ), msg1, )?; - send_frame(&mut sink, &syn_ack.encode(), config.max_message_size).await?; + send_frame(&mut sink, syn_ack.encode(), config.max_message_size).await?; let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?; let ack = Ack::decode(ack_bytes)?; @@ -301,11 +301,14 @@ pub struct Sender { impl Sender { /// Encrypts and sends a message to the peer. - pub async fn send(&mut self, msg: &[u8]) -> Result<(), Error> { - let c = self.cipher.send(msg)?; + pub async fn send(&mut self, mut buf: impl Buf) -> Result<(), Error> { + // Copy the buffer to ensure contiguous memory for encryption. + let msg = buf.copy_to_bytes(buf.remaining()); + let c = self.cipher.send(msg.as_ref())?; + send_frame( &mut self.sink, - &c, + Bytes::from(c), self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD), ) .await?; @@ -394,10 +397,10 @@ mod test { assert_eq!(listener_peer, dialer_crypto.public_key()); let messages: Vec<&'static [u8]> = vec![b"A", b"B", b"C"]; for msg in &messages { - dialer_sender.send(msg).await?; + dialer_sender.send(&msg[..]).await?; let syn_ack = listener_receiver.recv().await?; assert_eq!(msg, &syn_ack); - listener_sender.send(msg).await?; + listener_sender.send(&msg[..]).await?; let ack = dialer_receiver.recv().await?; assert_eq!(msg, &ack); } diff --git a/stream/src/utils/codec.rs b/stream/src/utils/codec.rs index 157d635219..9683ec75f3 100644 --- a/stream/src/utils/codec.rs +++ b/stream/src/utils/codec.rs @@ -1,31 +1,28 @@ use crate::Error; -use bytes::{Bytes, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use commonware_codec::{ varint::{Decoder, UInt}, - EncodeSize as _, Write as _, + Encode, }; use commonware_runtime::{Sink, Stream}; -use commonware_utils::StableBuf; /// Sends data to the sink with a varint length prefix. /// Returns an error if the message is too large or the stream is closed. pub async fn send_frame( sink: &mut S, - buf: &[u8], + buf: impl Buf + Send, max_message_size: u32, ) -> Result<(), Error> { // Validate frame size - let n = buf.len(); + let n = buf.remaining(); if n > max_message_size as usize { return Err(Error::SendTooLarge(n)); } // Prefix `buf` with its varint-encoded length and send it let len = UInt(n as u32); - let mut prefixed_buf = BytesMut::with_capacity(len.encode_size() + buf.len()); - len.write(&mut prefixed_buf); - prefixed_buf.extend_from_slice(buf); - sink.send(prefixed_buf).await.map_err(Error::SendFailed) + let data = len.encode().chain(buf); + sink.send(data).await.map_err(Error::SendFailed) } /// Receives data from the stream with a varint length prefix. @@ -34,30 +31,32 @@ pub async fn send_frame( pub async fn recv_frame(stream: &mut T, max_message_size: u32) -> Result { // Read and decode the varint length prefix byte-by-byte let mut decoder = Decoder::::new(); - let mut buf = StableBuf::from(vec![0u8; 1]); + let mut buf = [0u8; 1]; let len = loop { - buf = stream.recv(buf).await.map_err(Error::RecvFailed)?; + stream.recv(&mut buf[..]).await.map_err(Error::RecvFailed)?; match decoder.feed(buf[0]) { Ok(Some(len)) => break len as usize, Ok(None) => continue, Err(_) => return Err(Error::InvalidVarint), } }; - - // Validate frame size if len > max_message_size as usize { return Err(Error::RecvTooLarge(len)); } // Read the rest of the message - let read = stream.recv(vec![0; len]).await.map_err(Error::RecvFailed)?; - Ok(read.into()) + let mut read = BytesMut::zeroed(len); + stream + .recv(&mut read[..]) + .await + .map_err(Error::RecvFailed)?; + Ok(read.freeze()) } #[cfg(test)] mod tests { use super::*; - use bytes::BufMut; + use bytes::{BufMut, BytesMut}; use commonware_runtime::{deterministic, mocks, Runner}; use rand::Rng; @@ -72,7 +71,7 @@ mod tests { let mut buf = [0u8; MAX_MESSAGE_SIZE as usize]; context.fill(&mut buf); - let result = send_frame(&mut sink, &buf, MAX_MESSAGE_SIZE).await; + let result = send_frame(&mut sink, buf.as_slice(), MAX_MESSAGE_SIZE).await; assert!(result.is_ok()); let data = recv_frame(&mut stream, MAX_MESSAGE_SIZE).await.unwrap(); @@ -93,9 +92,9 @@ mod tests { context.fill(&mut buf2); // Send two messages of different sizes - let result = send_frame(&mut sink, &buf1, MAX_MESSAGE_SIZE).await; + let result = send_frame(&mut sink, buf1.as_slice(), MAX_MESSAGE_SIZE).await; assert!(result.is_ok()); - let result = send_frame(&mut sink, &buf2, MAX_MESSAGE_SIZE).await; + let result = send_frame(&mut sink, buf2.as_slice(), MAX_MESSAGE_SIZE).await; assert!(result.is_ok()); // Read both messages in order @@ -117,18 +116,17 @@ mod tests { let mut buf = [0u8; MAX_MESSAGE_SIZE as usize]; context.fill(&mut buf); - let result = send_frame(&mut sink, &buf, MAX_MESSAGE_SIZE).await; + let result = send_frame(&mut sink, buf.as_slice(), MAX_MESSAGE_SIZE).await; assert!(result.is_ok()); // Do the reading manually without using recv_frame // 1024 (MAX_MESSAGE_SIZE) encodes as varint: [0x80, 0x08] (2 bytes) - let read = stream.recv(vec![0; 2]).await.unwrap(); + let mut read = [0u8; 2]; + stream.recv(&mut read[..]).await.unwrap(); assert_eq!(read.as_ref(), &[0x80, 0x08]); // 1024 as varint - let read = stream - .recv(vec![0; MAX_MESSAGE_SIZE as usize]) - .await - .unwrap(); - assert_eq!(read.as_ref(), buf); + let mut read = vec![0u8; MAX_MESSAGE_SIZE as usize]; + stream.recv(&mut read[..]).await.unwrap(); + assert_eq!(read, buf); }); } @@ -141,7 +139,7 @@ mod tests { let mut buf = [0u8; MAX_MESSAGE_SIZE as usize]; context.fill(&mut buf); - let result = send_frame(&mut sink, &buf, MAX_MESSAGE_SIZE - 1).await; + let result = send_frame(&mut sink, buf.as_slice(), MAX_MESSAGE_SIZE - 1).await; assert!( matches!(&result, Err(Error::SendTooLarge(n)) if *n == MAX_MESSAGE_SIZE as usize) );