diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index ea7076dde..cd0568ada 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -130,7 +130,7 @@ impl EventLoop { requests_in_channel.retain(|request| { match request { - Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack + Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack _ => true, } }); @@ -398,20 +398,20 @@ async fn mqtt_connect( options: &mut MqttOptions, network: &mut Network, ) -> Result { - let keep_alive = options.keep_alive().as_secs() as u16; - let clean_start = options.clean_start(); - let client_id = options.client_id(); - let properties = options.connect_properties(); - - let connect = Connect { - keep_alive, - client_id, - clean_start, - properties, - }; + let packet = Packet::Connect( + Connect { + client_id: options.client_id(), + keep_alive: options.keep_alive().as_secs() as u16, + clean_start: options.clean_start(), + properties: options.connect_properties(), + }, + options.last_will(), + options.credentials(), + ); // send mqtt connect packet - network.connect(connect, options).await?; + network.write(packet).await?; + network.flush().await?; // validate connack match network.read().await? { diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index c7e06a250..4a5c3049d 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -5,8 +5,7 @@ use tokio_util::codec::Framed; use crate::framed::AsyncReadWrite; use super::mqttbytes::v5::Packet; -use super::{mqttbytes, Codec, Connect, MqttOptions, MqttState}; -use super::{Incoming, StateError}; +use super::{mqttbytes, Codec, Incoming, MqttState, StateError}; /// Network transforms packets <-> frames efficiently. It takes /// advantage of pre-allocation, buffering and vectorization when @@ -86,19 +85,6 @@ impl Network { .map_err(StateError::Deserialization) } - pub async fn connect( - &mut self, - connect: Connect, - options: &MqttOptions, - ) -> Result<(), StateError> { - let last_will = options.last_will(); - let login = options.credentials(); - self.write(Packet::Connect(connect, last_will, login)) - .await?; - - self.flush().await - } - pub async fn flush(&mut self) -> Result<(), StateError> { self.framed .flush()