Skip to content

Commit

Permalink
Merge pull request #223 from quartiq/mqtt-rs-rework
Browse files Browse the repository at this point in the history
mqtt rs rework
  • Loading branch information
jordens authored Jul 9, 2024
2 parents f5ad869 + 7f7a591 commit 4eccb92
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
2 changes: 1 addition & 1 deletion miniconf_mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ name = "mqtt"

[dev-dependencies]
machine = "0.3"
env_logger = "0.10"
env_logger = "0.11"
std-embedded-nal = { git = "https://gitlab.com/ryan-summers/std-embedded-nal", branch = "feature/0.8" }
tokio = { version = "1.9", features = ["rt-multi-thread", "time", "macros"] }
std-embedded-time = "0.1"
Expand Down
35 changes: 13 additions & 22 deletions miniconf_mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Iter<M, const Y: usize> = NodeIter<M, Y, Path<String<MAX_TOPIC_LENGTH>, SEP
#[derive(Debug, PartialEq)]
pub enum Error<E> {
/// Miniconf
Miniconf(miniconf::Error<()>),
Miniconf(miniconf::Traversal),
/// State machine
State(sm::Error),
/// Minimq
Expand All @@ -52,20 +52,9 @@ impl<E> From<sm::Error> for Error<E> {
}
}

impl<E, F> From<miniconf::Error<F>> for Error<E> {
fn from(value: miniconf::Error<F>) -> Self {
Self::Miniconf(match value {
miniconf::Error::Finalization(_) => miniconf::Error::Finalization(()),
miniconf::Error::Inner(depth, _) => miniconf::Error::Inner(depth, ()),
miniconf::Error::Traversal(t) => miniconf::Error::Traversal(t),
_ => unimplemented!(),
})
}
}

impl<E> From<miniconf::Traversal> for Error<E> {
fn from(value: miniconf::Traversal) -> Self {
Self::Miniconf(value.into())
Self::Miniconf(value)
}
}

Expand Down Expand Up @@ -216,7 +205,6 @@ impl From<ResponseCode> for minimq::Property<'static> {
/// # Example
/// ```
/// use miniconf::Tree;
/// use miniconf_mqtt::MqttClient;
///
/// #[derive(Tree, Clone, Default)]
/// struct Settings {
Expand All @@ -225,11 +213,11 @@ impl From<ResponseCode> for minimq::Property<'static> {
///
/// let mut buffer = [0u8; 1024];
/// let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();
/// let mut client: MqttClient<'_, _, _, _, minimq::broker::IpBroker, 1> = MqttClient::new(
/// let mut client = miniconf_mqtt::MqttClient::new(
/// std_embedded_nal::Stack::default(),
/// "quartiq/application/12345", // prefix
/// std_embedded_time::StandardClock::default(),
/// minimq::ConfigBuilder::new(localhost.into(), &mut buffer),
/// minimq::ConfigBuilder::<minimq::broker::IpBroker>::new(localhost.into(), &mut buffer),
/// )
/// .unwrap();
/// let mut settings = Settings::default();
Expand Down Expand Up @@ -324,7 +312,7 @@ where
}
sm::States::Init => {
info!("Republishing");
self.publish("").ok();
self.publish(None).ok();
}
sm::States::Multipart => {
if self.pending.response_topic.is_some() {
Expand All @@ -350,7 +338,6 @@ where
.retain()
.finish()
.unwrap(); // Note(unwrap): has topic

self.mqtt.client().publish(msg).is_ok()
}

Expand All @@ -367,9 +354,13 @@ where
/// # Note
/// This is intended to be used if modification of a setting had side effects that affected
/// another setting.
pub fn publish(&mut self, path: &str) -> Result<(), Error<Stack::Error>> {
self.pending = Multipart::default().root(&Path::<_, SEPARATOR>::from(path))?;
pub fn publish(&mut self, path: Option<&str>) -> Result<(), Error<Stack::Error>> {
let mut m = Multipart::default();
if let Some(path) = path {
m = m.root(&Path::<_, SEPARATOR>::from(path))?;
}
self.state.process_event(sm::Events::Multipart)?;
self.pending = m;
Ok(())
}

Expand Down Expand Up @@ -529,7 +520,7 @@ where
(state.state() == &sm::States::Single)
.then_some(())
.ok_or("Pending multipart response")
.and_then(|()| Multipart::<Settings, Y>::try_from(properties))
.and_then(|()| Multipart::try_from(properties))
.map_or_else(
|err| {
Self::respond(err, ResponseCode::Error, properties, client)
Expand All @@ -538,7 +529,7 @@ where
|m| {
*pending = m.root(&path).unwrap(); // Note(unwrap) checked that it's TooShort but valid leaf
state.process_event(sm::Events::Multipart).unwrap();
// Response comes through iter_list/iter_dump
// Responses come through iter_list/iter_dump
},
);
}
Expand Down
4 changes: 4 additions & 0 deletions py/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ DUT_PID=$!
REPUB=$(timeout --foreground 3 mosquitto_sub -t "$PREFIX/+/settings/#" -h localhost | wc -l)
test $REPUB != 9 && exit 1

# test alive-ness
ALIVE=$(timeout --foreground 1 mosquitto_sub -t "$PREFIX/+/alive" -h localhost -F '%p' || true)
test $ALIVE != 1 && exit 1

# no discover SET
python -m miniconf -b localhost $PREFIX/id '/stream="192.0.2.16:9293"'
# discover miniconf command
Expand Down

0 comments on commit 4eccb92

Please sign in to comment.