Skip to content

Commit

Permalink
fix: filter unexpected PubAck in event loop pending queue. (#859)
Browse files Browse the repository at this point in the history
* Update eventloop.rs

filter unexpected puback.

* Update eventloop.rs

filter unexpected puback

* cargo fmt

* Update CHANGELOG.md

update CHANGELOG.md

* Update rumqttc/src/eventloop.rs

apply suggestion from rumqtt team

Co-authored-by: Devdutt Shenoi <[email protected]>

* Update rumqttc/src/v5/eventloop.rs

apply suggestion from rumqtt team

Co-authored-by: Devdutt Shenoi <[email protected]>

---------

Co-authored-by: Devdutt Shenoi <[email protected]>
Co-authored-by: Devdutt Shenoi <[email protected]>
  • Loading branch information
3 people authored May 22, 2024
1 parent 83d8f77 commit 51224e1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Filter PUBACK in pending save requests to fix unexpected PUBACK sent to reconnected broker.
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security
Expand Down
10 changes: 9 additions & 1 deletion rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,15 @@ impl EventLoop {
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
let requests_in_channel = self.requests_rx.drain();
let mut requests_in_channel: Vec<_> = self.requests_rx.drain().collect();

requests_in_channel.retain(|request| {
match request {
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
_ => true,
}
});

self.pending.extend(requests_in_channel);
}

Expand Down
10 changes: 9 additions & 1 deletion rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,15 @@ impl EventLoop {
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
let requests_in_channel = self.requests_rx.drain();
let mut requests_in_channel: Vec<_> = self.requests_rx.drain().collect();

requests_in_channel.retain(|request| {
match request {
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
_ => true,
}
});

self.pending.extend(requests_in_channel);
}

Expand Down

0 comments on commit 51224e1

Please sign in to comment.