Skip to content

Commit

Permalink
Fix large transfers (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Apr 13, 2024
1 parent b1c3d88 commit e612ab5
Show file tree
Hide file tree
Showing 18 changed files with 103 additions and 56 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.4] - 2024-04-13

* Fix large transfers handling

## [2.1.3] - 2024-04-11

* Handle settled transfers
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "2.1.3"
version = "2.1.4"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down Expand Up @@ -37,7 +37,9 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.11"
rand = "0.8"
ntex = { version = "1", features = ["tokio"] }
ntex-amqp = { path = ".", features = ["frame-trace"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
1 change: 0 additions & 1 deletion codec/codegen/definitions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(unused_assignments, unused_variables, unreachable_patterns)]

use std::u8;
use derive_more::From;

use super::*;
Expand Down
2 changes: 1 addition & 1 deletion codec/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{char, collections, convert::TryFrom, hash::BuildHasher, hash::Hash, u8};
use std::{char, collections, convert::TryFrom, hash::BuildHasher, hash::Hash};

use byteorder::{BigEndian, ByteOrder};
use chrono::{DateTime, TimeZone, Utc};
Expand Down
3 changes: 1 addition & 2 deletions codec/src/codec/encode.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::hash::{BuildHasher, Hash};
use std::{collections::HashMap, i8, u8};
use std::{collections::HashMap, hash::BuildHasher, hash::Hash};

use chrono::{DateTime, Utc};
use ntex_bytes::{BufMut, ByteString, Bytes, BytesMut};
Expand Down
4 changes: 2 additions & 2 deletions codec/src/message/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Encode for MessageBody {
.fold(0, |a, seq| a + seq.encoded_size() + SECTION_PREFIX_LENGTH);
size += self.messages.iter().fold(0, |a, m| {
let length = m.encoded_size();
let size = length + if length > std::u8::MAX as usize { 5 } else { 2 };
let size = length + if length > u8::MAX as usize { 5 } else { 2 };
a + size + SECTION_PREFIX_LENGTH
});

Expand Down Expand Up @@ -75,7 +75,7 @@ impl Encode for MessageBody {

// Bytes prefix
let length = m.encoded_size();
if length > std::u8::MAX as usize {
if length > u8::MAX as usize {
dst.put_u8(FORMATCODE_BINARY32);
dst.put_u32(length as u32);
} else {
Expand Down
2 changes: 1 addition & 1 deletion codec/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Message {
/// Create new message and set `correlation_id` property
pub fn reply_message(&self) -> Message {
Message::default().if_some(&self.0.properties, |mut msg, data| {
msg.set_properties(|props| props.correlation_id = data.message_id.clone());
msg.set_properties(|props| props.correlation_id.clone_from(&data.message_id));
msg
})
}
Expand Down
1 change: 0 additions & 1 deletion codec/src/protocol/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use super::*;
use crate::codec::{decode_format_code, decode_list_header};
use derive_more::From;
use std::u8;

#[derive(Clone, Debug, PartialEq, Eq, From)]
pub enum Frame {
Expand Down
14 changes: 1 addition & 13 deletions src/cell.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
//! Custom cell impl
use std::cell::UnsafeCell;
use std::ops::Deref;
use std::rc::{Rc, Weak};
use std::{cell::UnsafeCell, ops::Deref, rc::Rc};

pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}

pub(crate) struct WeakCell<T> {
inner: Weak<UnsafeCell<T>>,
}

impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
Expand Down Expand Up @@ -49,9 +43,3 @@ impl<T> Cell<T> {
unsafe { &mut *self.inner.as_ref().get() }
}
}

impl<T: std::fmt::Debug> std::fmt::Debug for WeakCell<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
10 changes: 5 additions & 5 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ impl ConnectionRef {
let begin = Begin(Box::new(codec::BeginInner {
remote_channel: None,
next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
incoming_window: std::u32::MAX,
outgoing_window: std::u32::MAX,
handle_max: std::u32::MAX,
incoming_window: u32::MAX,
outgoing_window: u32::MAX,
handle_max: u32::MAX,
offered_capabilities: None,
desired_capabilities: None,
properties: None,
Expand Down Expand Up @@ -305,9 +305,9 @@ impl ConnectionInner {
let begin = Begin(Box::new(codec::BeginInner {
remote_channel: Some(remote_channel_id),
next_outgoing_id: 1,
incoming_window: std::u32::MAX,
incoming_window: u32::MAX,
outgoing_window: begin.incoming_window(),
handle_max: std::u32::MAX,
handle_max: u32::MAX,
offered_capabilities: None,
desired_capabilities: None,
properties: None,
Expand Down
8 changes: 2 additions & 6 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,8 @@ impl DeliveryBuilder {

if let Some(ref err) = inner.error {
Err(err.clone())
} else if inner
.max_message_size
.map(|l| self.data.len() > l as usize)
.unwrap_or_default()
{
Err(AmqpProtocolError::BodyTooLarge)
} else if inner.closed {
Err(AmqpProtocolError::Disconnected)
} else {
let id = self
.sender
Expand Down
6 changes: 3 additions & 3 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ where
let fut = self
.service
.call_static(types::Message::Attached(frm.clone(), link.clone()));
ntex::rt::spawn(async move {
let _ = ntex::rt::spawn(async move {
let result = fut.await;
if let Err(err) = result {
let _ = link.close_with_error(Error::from(err)).await;
Expand Down Expand Up @@ -337,7 +337,7 @@ where
types::Action::DetachReceiver(link, frm) => {
let lnk = link.clone();
let fut = self.service.call_static(types::Message::Detached(lnk));
spawn(async move {
let _ = spawn(async move {
let _ = fut.await;
});
self.call_control_service(ControlFrame::new(
Expand All @@ -357,7 +357,7 @@ where
let fut = self
.service
.call_static(types::Message::DetachedAll(receivers));
spawn(async move {
let _ = spawn(async move {
let _ = fut.await;
});
self.call_control_service(ControlFrame::new_kind(
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryFrom, error, io};
use std::{error, io};

use ntex::util::{ByteString, Either};

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Configuration {
Configuration {
disp_config,
max_size: 0,
max_frame_size: std::u16::MAX as u32,
max_frame_size: u16::MAX as u32,
channel_max: 1024,
idle_time_out: 120_000,
hostname: None,
Expand Down
6 changes: 3 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryFrom, future::poll_fn, marker, rc::Rc};
use std::{future::poll_fn, marker, rc::Rc};

use ntex::router::{IntoPattern, Router as PatternRouter};
use ntex::service::{
Expand Down Expand Up @@ -140,7 +140,7 @@ impl<S: 'static> Service<Message> for RouterService<S> {
if let Some(Some(srv)) = self.0.get_mut().handlers.remove(&link) {
log::trace!("Releasing handler service for {}", link.name());
let name = link.name().clone();
ntex::rt::spawn(async move {
let _ = ntex::rt::spawn(async move {
poll_fn(move |cx| srv.poll_shutdown(cx)).await;
log::trace!("Handler service for {} has shutdown", name);
});
Expand Down Expand Up @@ -168,7 +168,7 @@ impl<S: 'static> Service<Message> for RouterService<S> {
futs.len()
);

ntex::rt::spawn(async move {
let _ = ntex::rt::spawn(async move {
let len = futs.len();
let _ = join_all(futs).await;
log::trace!(
Expand Down
24 changes: 10 additions & 14 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, convert::TryFrom, fmt, future::Future};
use std::{cmp, collections::VecDeque, fmt, future::Future, mem};

use ntex::channel::{condition, oneshot, pool};
use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef, Ready};
Expand Down Expand Up @@ -899,7 +899,7 @@ impl SessionInner {
.max_message_size()
.map(|v| u32::try_from(v).unwrap_or(u32::MAX)),
));
let local_sender = std::mem::replace(
let local_sender = mem::replace(
item,
SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
);
Expand Down Expand Up @@ -1147,7 +1147,7 @@ impl SessionInner {
} else {
None
},
incoming_window: std::u32::MAX,
incoming_window: u32::MAX,
next_outgoing_id: self.next_outgoing_id,
outgoing_window: self.remote_incoming_window,
handle: None,
Expand All @@ -1174,7 +1174,7 @@ impl SessionInner {
} else {
None
},
incoming_window: std::u32::MAX,
incoming_window: u32::MAX,
next_outgoing_id: self.next_outgoing_id,
outgoing_window: self.remote_incoming_window,
handle: Some(handle),
Expand Down Expand Up @@ -1247,7 +1247,7 @@ impl SessionInner {
}
};

let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));
let chunk = body.split_to(cmp::min(max_frame_size, body.len()));

let mut transfer = Transfer(Default::default());
transfer.0.handle = link_handle;
Expand Down Expand Up @@ -1278,24 +1278,20 @@ impl SessionInner {
);

loop {
let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));

// last chunk
if body.is_empty() {
log::trace!("{}: Sending last tranfer for {:?}", self.tag(), tag);

let mut transfer = Transfer(Default::default());
transfer.0.more = false;
self.post_frame(Frame::Transfer(transfer));
log::trace!("{}: Last tranfer for {:?} is sent", self.tag(), tag);
break;
}

log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag);
let chunk = body.split_to(cmp::min(max_frame_size, body.len()));

log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag);
let mut transfer = Transfer(Default::default());
transfer.0.delivery_id = Some(delivery_id);
transfer.0.handle = link_handle;
transfer.0.body = Some(TransferBody::Data(chunk));
transfer.0.more = true;
transfer.0.more = !body.is_empty();
transfer.0.batchable = true;
self.post_frame(Frame::Transfer(transfer));
}
Expand Down
2 changes: 1 addition & 1 deletion src/sndlink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, convert::TryFrom, future::Future};
use std::{collections::VecDeque, future::Future};

use ntex::channel::{condition, oneshot, pool};
use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready};
Expand Down
64 changes: 64 additions & 0 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use ntex::{http::Uri, rt, time::sleep, time::Millis};
use ntex_amqp::{
client, codec::protocol, error::LinkError, server, types, ControlFrame, ControlFrameKind,
};
use rand::{distributions::Alphanumeric, thread_rng, Rng};

async fn server(
_link: types::Link<()>,
Expand Down Expand Up @@ -91,6 +92,69 @@ async fn test_simple() -> std::io::Result<()> {
Ok(())
}

#[ntex::test]
async fn test_large_transfer() -> std::io::Result<()> {
let mut rng = thread_rng();
let data: String = (0..2048)
.map(|_| rng.sample(Alphanumeric) as char)
.collect();

let count = Arc::new(AtomicUsize::new(0));
let count2 = count.clone();
let srv = test_server(move || {
let count = count2.clone();
server::Server::build(|con: server::Handshake| async move {
match con {
server::Handshake::Amqp(con) => {
let con = con.open().await.unwrap();
Ok(con.ack(()))
}
server::Handshake::Sasl(_) => Err(()),
}
})
.control(|msg: ControlFrame| async move {
if let ControlFrameKind::AttachReceiver(_, rcv) = msg.kind() {
rcv.set_max_message_size(1024);
}
Ok::<_, ()>(())
})
.finish(
server::Router::<()>::new()
.service(
"test",
fn_factory_with_config(move |_: types::Link<()>| server_count(count.clone())),
)
.finish(),
)
});

let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();
let sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});

let session = sink.open_session().await.unwrap();
let link = session
.build_sender_link("test", "test")
.attach()
.await
.unwrap();

let delivery = link
.delivery(Bytes::from(data.clone()))
.send()
.await
.unwrap();
let st = delivery.wait().await.unwrap().unwrap();
assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {}));
sleep(Millis(250)).await;

assert_eq!(count.load(Ordering::Relaxed), 1);
Ok(())
}

async fn sasl_auth(auth: server::Sasl) -> Result<server::HandshakeAck<()>, server::HandshakeError> {
let init = auth
.mechanism("PLAIN")
Expand Down

0 comments on commit e612ab5

Please sign in to comment.