Skip to content

Commit

Permalink
mqtt: expose reset
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Jul 10, 2024
1 parent 42b6d9c commit 540e5a9
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions miniconf_mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ where
self.alive = alive;
}

/// Reset and restart state machine.
///
/// This rests the state machine to start from the `Connect` state.
/// This will connect (if not connected), send the alive message, subscribe,
/// and perform the initial settings dump.
pub fn reset(&mut self) {
self.state.process_event(sm::Events::Reset).unwrap();
}

/// Update the MQTT interface and service the network.
///
/// # Returns
Expand All @@ -306,12 +315,12 @@ where
}
}
sm::States::Alive => {
if self.alive() {
if self.alive().is_ok() {
self.state.process_event(sm::Events::Alive).unwrap();
}
}
sm::States::Subscribe => {
if self.subscribe() {
if self.subscribe().is_ok() {
info!("Subscribed");
self.state.process_event(sm::Events::Subscribe).unwrap();
}
Expand All @@ -337,25 +346,25 @@ where
self.poll(settings).map(|c| c == Changed::Changed)
}

fn alive(&mut self) -> bool {
fn alive(&mut self) -> Result<(), minimq::PubError<Stack::Error, ()>> {
// Publish a connection status message.
let mut alive = self.prefix.clone();
alive.push_str("/alive").unwrap();
let mut topic = self.prefix.clone();
topic.push_str("/alive").unwrap();
let msg = Publication::new(self.alive.as_bytes())
.topic(&alive)
.topic(&topic)
.qos(QoS::AtLeastOnce)
.retain()
.finish()
.unwrap(); // Note(unwrap): has topic
self.mqtt.client().publish(msg).is_ok()
self.mqtt.client().publish(msg)
}

fn subscribe(&mut self) -> bool {
fn subscribe(&mut self) -> Result<(), minimq::Error<Stack::Error>> {
let mut settings = self.prefix.clone();
settings.push_str("/settings/#").unwrap();
let opts = SubscriptionOptions::default().ignore_local_messages();
let topics = [TopicFilter::new(&settings).options(opts)];
self.mqtt.client().subscribe(&topics, &[]).is_ok()
self.mqtt.client().subscribe(&topics, &[])
}

/// Force republication of the current settings.
Expand Down

0 comments on commit 540e5a9

Please sign in to comment.