Skip to content

Commit

Permalink
refactor(rumqttc): Replace Vec with FixedBitSet for QoS 2 packet trac… (
Browse files Browse the repository at this point in the history
#869)

* refactor(rumqttc): Replace Vec with FixedBitSet for QoS 2 packet tracking.(#868)

* refactor(rumqttc): Undo unrelated import changes in state.rs. (#868)

* refactor(rumqttc): `v5/state.rs` Replace Vec with FixedBitSet for QoS 2 publishes (#868)

* update CHANGELOG.md
  • Loading branch information
hippalus authored May 22, 2024
1 parent 51224e1 commit 005c60e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 69 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* use `Framed` to encode/decode MQTT packets.
* use `Login` to store credentials
* Made `DisconnectProperties` struct public.
* Replace `Vec<Option<u16>>` with `FixedBitSet` for managing packet ids of released QoS 2 publishes and incoming QoS 2 publishes in `MqttState`.

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"
fixedbitset = "0.5.7"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
56 changes: 21 additions & 35 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{Event, Incoming, Outgoing, Request};

use crate::mqttbytes::v4::*;
use crate::mqttbytes::{self, *};
use fixedbitset::FixedBitSet;
use std::collections::VecDeque;
use std::{io, time::Instant};

Expand Down Expand Up @@ -62,9 +63,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand All @@ -89,8 +90,8 @@ impl MqttState {
max_inflight,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -113,17 +114,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
// remove packet ids of incoming qos2 publishes
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -210,7 +208,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid);
Expand Down Expand Up @@ -261,7 +259,7 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let pubrel = PubRel { pkid: pubrec.pkid };
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);
Expand All @@ -270,16 +268,12 @@ impl MqttState {
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}

self.incoming_pub.set(pubrel.pkid as usize, false);
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
let pubcomp = PubComp { pkid: pubrel.pkid };
self.events.push_back(event);
Expand All @@ -288,17 +282,12 @@ impl MqttState {
}

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> {
if self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?
.take()
.is_none()
{
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}

self.outgoing_rel.set(pubcomp.pkid as usize, false);
self.inflight -= 1;
let packet = self.check_collision(pubcomp.pkid).map(|publish| {
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
Expand Down Expand Up @@ -486,7 +475,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -610,10 +599,8 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -656,8 +643,7 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));

assert!(mqtt.events.is_empty());
}
Expand Down Expand Up @@ -725,7 +711,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down
53 changes: 20 additions & 33 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::mqttbytes::{self, Error as MqttError, QoS};
use super::{Event, Incoming, Outgoing, Request};

use bytes::Bytes;
use fixedbitset::FixedBitSet;
use std::collections::{HashMap, VecDeque};
use std::{io, time::Instant};

Expand Down Expand Up @@ -104,9 +105,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand Down Expand Up @@ -137,8 +138,8 @@ impl MqttState {
inflight: 0,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -163,17 +164,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid, None));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16, None));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -349,7 +347,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid, None);
Expand Down Expand Up @@ -416,23 +414,19 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None))))
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}
self.incoming_pub.set(pubrel.pkid as usize, false);

if pubrel.reason != PubRelReason::Success {
return Err(StateError::PubRelFail {
Expand All @@ -456,14 +450,11 @@ impl MqttState {
Packet::Publish(publish)
});

let pubrel = self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?;
if pubrel.take().is_none() {
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}
self.outgoing_rel.set(pubcomp.pkid as usize, false);

if pubcomp.reason != PubCompReason::Success {
return Err(StateError::PubCompFail {
Expand Down Expand Up @@ -668,7 +659,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -824,10 +815,8 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -870,9 +859,7 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);

assert!(mqtt.incoming_pub.contains(3));
assert!(mqtt.events.is_empty());
}

Expand Down Expand Up @@ -940,7 +927,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down

0 comments on commit 005c60e

Please sign in to comment.