Skip to content

Commit

Permalink
take prefix by reference
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Jul 10, 2024
1 parent 6e91ee1 commit 0c632f1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [miniconf_mqtt] the `/alive` message is now configurable
* [py/miniconf-mqtt] `discover()` returns the prefix and the alive payload
* [miniconf_mqtt] the prefix `&str` must now outlive the miniconf client

### Added

Expand Down
39 changes: 19 additions & 20 deletions miniconf_mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,21 @@ impl From<ResponseCode> for minimq::Property<'static> {
/// let mut settings = Settings::default();
/// client.update(&mut settings).unwrap();
/// ```
pub struct MqttClient<'buf, Settings, Stack, Clock, Broker, const Y: usize>
pub struct MqttClient<'a, Settings, Stack, Clock, Broker, const Y: usize>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::Broker,
{
mqtt: minimq::Minimq<'buf, Stack, Clock, Broker>,
mqtt: minimq::Minimq<'a, Stack, Clock, Broker>,
state: sm::StateMachine<sm::Context<Clock>>,
prefix: String<MAX_TOPIC_LENGTH>,
alive: &'buf str,
prefix: &'a str,
alive: &'a str,
pending: Multipart<Settings, Y>,
}

impl<'buf, Settings, Stack, Clock, Broker, const Y: usize>
MqttClient<'buf, Settings, Stack, Clock, Broker, Y>
impl<'a, Settings, Stack, Clock, Broker, const Y: usize>
MqttClient<'a, Settings, Stack, Clock, Broker, Y>
where
for<'de> Settings: JsonCoreSlash<'de, Y> + Clone,
Stack: TcpClientStack,
Expand All @@ -250,18 +250,17 @@ where
/// * `config` - The configuration of the MQTT client.
pub fn new(
stack: Stack,
prefix: &str,
prefix: &'a str,
clock: Clock,
config: ConfigBuilder<'buf, Broker>,
config: ConfigBuilder<'a, Broker>,
) -> Result<Self, ProtocolError> {
assert!(
prefix.len() + "/settings".len() + Settings::metadata().max_length("/")
<= MAX_TOPIC_LENGTH
);

// Configure a will so that we can indicate whether or not we are connected.
let prefix = String::try_from(prefix).unwrap();
let mut will = prefix.clone();
let mut will: String<MAX_TOPIC_LENGTH> = prefix.try_into().unwrap();
will.push_str("/alive").unwrap();
// Retained empty payload amounts to clearing the retained value (see MQTT spec).
let will = minimq::Will::new(&will, b"", &[])?
Expand All @@ -283,7 +282,7 @@ where
/// The default is to publish `1`.
/// The message is retained by the broker.
/// On disconnect the message is cleared retained through an MQTT will.
pub fn set_alive(&mut self, alive: &'buf str) {
pub fn set_alive(&mut self, alive: &'a str) {
self.alive = alive;
}

Expand Down Expand Up @@ -347,7 +346,7 @@ where

fn alive(&mut self) -> Result<(), minimq::PubError<Stack::Error, ()>> {
// Publish a connection status message.
let mut topic = self.prefix.clone();
let mut topic: String<MAX_TOPIC_LENGTH> = self.prefix.try_into().unwrap();
topic.push_str("/alive").unwrap();
let msg = Publication::new(self.alive.as_bytes())
.topic(&topic)
Expand All @@ -359,7 +358,7 @@ where
}

fn subscribe(&mut self) -> Result<(), minimq::Error<Stack::Error>> {
let mut settings = self.prefix.clone();
let mut settings: String<MAX_TOPIC_LENGTH> = self.prefix.try_into().unwrap();
settings.push_str("/settings/#").unwrap();
let opts = SubscriptionOptions::default().ignore_local_messages();
let topics = [TopicFilter::new(&settings).options(opts)];
Expand Down Expand Up @@ -426,7 +425,7 @@ where
let (path, node) = path.unwrap(); // Note(unwraped): checked capacity
assert!(node.is_leaf()); // Note(assert): Iterator depth unlimited

let mut topic = self.prefix.clone();
let mut topic: String<MAX_TOPIC_LENGTH> = self.prefix.try_into().unwrap();
topic
.push_str("/settings")
.and_then(|_| topic.push_str(&path))
Expand Down Expand Up @@ -473,11 +472,11 @@ where
}
}

fn respond<'a, T: Display>(
fn respond<'b, T: Display>(
response: T,
code: ResponseCode,
request: &Properties<'a>,
client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>,
request: &Properties<'b>,
client: &mut minimq::mqtt_client::MqttClient<'a, Stack, Clock, Broker>,
) -> Result<
(),
minimq::PubError<Stack::Error, embedded_io::WriteFmtError<embedded_io::SliceWriteError>>,
Expand All @@ -495,7 +494,7 @@ where
.map_err(minimq::Error::from)?,
)
.map_err(|err| {
debug!("Response failure: {err:?}");
info!("Response failure: {err:?}");
err
})
}
Expand All @@ -510,7 +509,7 @@ where
} = self;
mqtt.poll(|client, topic, payload, properties| {
let Some(path) = topic
.strip_prefix(prefix.as_str())
.strip_prefix(*prefix)
.and_then(|p| p.strip_prefix("/settings"))
.map(Path::<_, SEPARATOR>::from)
else {
Expand Down Expand Up @@ -555,7 +554,7 @@ where
Self::respond(err, ResponseCode::Error, properties, client).ok();
}
minimq::PubError::Error(minimq::Error::NotReady) => {
warn!("Not ready during Get.");
warn!("Not ready during Get. Discarding.");
}
minimq::PubError::Error(err) => {
error!("Get failure: {err:?}");
Expand Down

0 comments on commit 0c632f1

Please sign in to comment.