Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 55 additions & 86 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl MQTTState {
}

fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let mut tx = MQTTTransaction::new(msg, direction);
self.tx_id += 1;
tx.tx_id = self.tx_id;
Expand Down Expand Up @@ -217,104 +217,82 @@ impl MQTTState {
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push_back(tx);
}
self.transactions.push_back(tx);
}
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
let qos = msg.header.qos_level;
let pkt_id = publish.message_id;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
if let Some(pkt_id) = pkt_id {
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push_back(tx);
}
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
let pkt_id = subscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
let pkt_id = unsubscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
self.transactions.push_back(tx);
}
MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
Expand All @@ -325,72 +303,65 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand All @@ -406,25 +377,19 @@ impl MQTTState {
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand Down Expand Up @@ -608,7 +573,11 @@ impl MQTTState {
}

fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
let mut tx = MQTTTransaction::new_empty(if toclient {
Direction::ToClient
} else {
Direction::ToServer
});
self.tx_id += 1;
tx.tx_id = self.tx_id;
if toclient {
Expand Down
Loading