diff --git a/CHANGES.md b/CHANGES.md index 5ae71bf..152f6ad 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ * Fix large transfers handling +* Fix Receiver link message size handling + ## [2.1.3] - 2024-04-11 * Handle settled transfers diff --git a/src/delivery.rs b/src/delivery.rs index ce3a086..426caed 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -300,6 +300,12 @@ impl DeliveryBuilder { } else if inner.closed { Err(AmqpProtocolError::Disconnected) } else { + if let Some(limit) = inner.max_message_size { + if self.data.len() > limit as usize { + return Err(AmqpProtocolError::BodyTooLarge); + } + } + let id = self .sender .get_mut() diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 514deec..edbe949 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -32,7 +32,6 @@ pub(crate) struct ReceiverLinkInner { delivery_count: u32, error: Option, partial_body: Option, - partial_body_max: usize, max_message_size: u64, pool: PoolRef, } @@ -104,13 +103,6 @@ impl ReceiverLink { self.inner.get_mut().max_message_size = size; } - /// Set max total size for partial transfers. - /// - /// Default is 256Kb - pub fn set_max_partial_transfer_size(&self, size: usize) { - self.inner.get_mut().set_max_partial_transfer(size); - } - /// Check deliveries pub fn has_deliveries(&self) -> bool { !self.inner.get_mut().queue.is_empty() @@ -230,9 +222,8 @@ impl ReceiverLinkInner { credit: 0, error: None, partial_body: None, - partial_body_max: 262_144, delivery_count: frame.initial_delivery_count().unwrap_or(0), - max_message_size: frame.max_message_size().unwrap_or(0), + max_message_size: 262_144, reader_task: LocalWaker::new(), } } @@ -276,10 +267,6 @@ impl ReceiverLinkInner { } } - fn set_max_partial_transfer(&mut self, size: usize) { - self.partial_body_max = size; - } - pub(crate) fn set_link_credit(&mut self, credit: u32) { self.credit += credit; self.session @@ -308,6 +295,8 @@ impl ReceiverLinkInner { self.credit -= 1; } + println!("============= {:#?}\n{:?}", transfer, self.partial_body); + // handle batched transfer if let Some(ref mut body) = self.partial_body { if transfer.0.delivery_id.is_some() { @@ -329,7 +318,7 @@ impl ReceiverLinkInner { // merge transfer data and check size if let Some(transfer_body) = transfer.0.body.take() { - if body.len() + transfer_body.len() > self.partial_body_max { + if body.len() + transfer_body.len() > self.max_message_size as usize { let err = Error(Box::new(codec::ErrorInner { condition: LinkError::MessageSizeExceeded.into(), description: None, @@ -342,7 +331,7 @@ impl ReceiverLinkInner { transfer_body.encode(body); } - if !transfer.more() { + if transfer.more() { // dont need to update queue, we use first transfer frame as primary Ok(Action::None) } else { diff --git a/src/session.rs b/src/session.rs index e7a262f..b975d63 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1194,7 +1194,6 @@ impl SessionInner { tag: Bytes, body: TransferBody, settled: bool, - max_frame_size: Option, ) -> Result { loop { if self.remote_incoming_window == 0 { @@ -1227,7 +1226,7 @@ impl SessionInner { }; let message_format = body.message_format(); - let max_frame_size = max_frame_size.unwrap_or_else(|| self.max_frame_size()); + let max_frame_size = self.max_frame_size(); let max_frame_size = if max_frame_size > 2048 { max_frame_size - 2048 } else if max_frame_size == 0 { @@ -1276,6 +1275,7 @@ impl SessionInner { transfer.batchable(), transfer.settled(), ); + self.post_frame(Frame::Transfer(transfer)); loop { // last chunk @@ -1288,11 +1288,11 @@ impl SessionInner { log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag); let mut transfer = Transfer(Default::default()); - transfer.0.delivery_id = Some(delivery_id); transfer.0.handle = link_handle; transfer.0.body = Some(TransferBody::Data(chunk)); transfer.0.more = !body.is_empty(); transfer.0.batchable = true; + transfer.0.message_format = message_format; self.post_frame(Frame::Transfer(transfer)); } } else { diff --git a/src/sndlink.rs b/src/sndlink.rs index c84fae6..9bdd2e3 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -354,7 +354,7 @@ impl SenderLinkInner { self.session .inner .get_mut() - .send_transfer(self.id as u32, tag, body, settled, self.max_message_size) + .send_transfer(self.id as u32, tag, body, settled) .await } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 344dbf4..d9ea702 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -94,6 +94,8 @@ async fn test_simple() -> std::io::Result<()> { #[ntex::test] async fn test_large_transfer() -> std::io::Result<()> { + env_logger::init(); + let mut rng = thread_rng(); let data: String = (0..2048) .map(|_| rng.sample(Alphanumeric) as char) @@ -112,9 +114,12 @@ async fn test_large_transfer() -> std::io::Result<()> { server::Handshake::Sasl(_) => Err(()), } }) + .config(|cfg| { + cfg.max_frame_size(1024); + }) .control(|msg: ControlFrame| async move { if let ControlFrameKind::AttachReceiver(_, rcv) = msg.kind() { - rcv.set_max_message_size(1024); + rcv.set_max_message_size(10 * 1024); } Ok::<_, ()>(()) })