Skip to content

Commit

Permalink
Merge pull request #20 from quartiq/feature/default-handler
Browse files Browse the repository at this point in the history
Restructuring minireq for flexibility
  • Loading branch information
jordens authored Jun 20, 2024
2 parents f17ed49 + 524a9c1 commit ab28a95
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 108 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# Unreleased

## Changed
* [breaking] Handlers are no longer stored internally. The `poll()` function is now required to
perform the mapping to individual handlers. This is intended to make the handler API signature more
flexible, as not all handlers may need the same context.

## [0.4.0] - 2024-06-13

* [breaking] Updated to `minimq` v0.9.0
Expand Down
148 changes: 51 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,28 @@
//! MQTT Request/response Handling
//!
//! # Overview
//! This library is intended to be an easy way to handle inbound requests automatically.
//! This library is intended to be an easy way to handle inbound requests. You can subscribe to
//! topics belonging to some prefix, and Minireq will ensure that these topics are published to the
//! MQTT broker upon connection to handle discoverability.
//!
//! Handler functions can be associated with the library to be automatically called whenever a
//! specified request is received, and the handler will automatically be invoked with the request
//! data.
//!
//! ## Limitations
//! * The `poll()` function has a somewhat odd signature (using a function to provide the `Context`
//! and call the handler) due to required compatibility with RTIC and unlocked resources.
//!
//! * Handlers may only be closures that do not capture any local resources. Instead, move local
//! captures into the `Context`, which will be provided to the handler in the function call.
//! Minireq also simplifies the process of generating responses to the inbound request
//! automatically.
//!
//! ## Example
//! ```no_run
//! use embedded_io::Write;
//! # use embedded_nal::TcpClientStack;
//!
//! #[derive(Copy, Clone)]
//! struct Context {}
//!
//! #[derive(serde::Serialize, serde::Deserialize)]
//! struct Request {
//! data: u32,
//! }
//!
//! // Handler function for processing an incoming request.
//! pub fn handler(
//! context: &mut Context,
//! cmd: &str,
//! pub fn test_handler(
//! data: &[u8],
//! mut output_buffer: &mut [u8]
//! ) -> Result<usize, minireq::NoErrors> {
//! ) -> Result<usize, &'static str> {
//! // Deserialize the request.
//! let mut request: Request = serde_json_core::from_slice(data).unwrap().0;
//!
Expand All @@ -56,25 +45,24 @@
//! );
//!
//! // Construct the client
//! let mut handlers = [None; 10];
//! let mut client: minireq::Minireq<Context, _, _, _> = minireq::Minireq::new(
//! let mut client: minireq::Minireq<_, _, _> = minireq::Minireq::new(
//! "prefix/device",
//! mqtt,
//! &mut handlers[..],
//! )
//! .unwrap();
//!
//! // Whenever the `/test` command is received, call the associated handler.
//! // You may add as many handlers as you would like.
//! client.register("/test", handler).unwrap();
//! // We want to listen for any messages coming in on the "prefix/device/command/test" topic
//! client.subscribe("test").unwrap();
//!
//! // ...
//!
//! loop {
//! // In your main execution loop, continually poll the client to process incoming requests.
//! client.poll(|handler, command, data, buffer| {
//! let mut context = Context {};
//! handler(&mut context, command, data, buffer)
//! client.poll(|command, data, buffer| {
//! match command {
//! "test" => test_handler(data, buffer),
//! _ => unreachable!(),
//! }
//! }).unwrap();
//! }
//! ```
Expand All @@ -90,15 +78,6 @@ use minimq::{embedded_nal::TcpClientStack, embedded_time, QoS};
use heapless::String;
use log::{info, warn};

#[derive(Copy, Clone, Debug)]
pub struct NoErrors;

impl core::fmt::Display for NoErrors {
fn fmt(&self, _f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
unimplemented!()
}
}

pub enum ResponseCode {
Ok,
Error,
Expand All @@ -123,7 +102,7 @@ const MAX_TOPIC_LENGTH: usize = 128;

#[derive(Debug, PartialEq)]
pub enum Error<E> {
RegisterFailed,
SubscribeFailed,
PrefixTooLong,
Mqtt(minimq::Error<E>),
}
Expand All @@ -150,33 +129,28 @@ mod sm {
}
}

type Handler<Context, E> = fn(&mut Context, &str, &[u8], &mut [u8]) -> Result<usize, E>;

#[derive(Copy, Clone, Debug)]
pub struct HandlerMeta<Context, E> {
handler: Handler<Context, E>,
pub struct HandlerMeta {
republished: bool,
}

pub type HandlerSlot<'a, Context, E> = Option<(&'a str, HandlerMeta<Context, E>)>;
pub type HandlerSlot<'a> = Option<(&'a str, HandlerMeta)>;

/// MQTT request/response interface.
pub struct Minireq<'a, Context, Stack, Clock, Broker, E = NoErrors>
pub struct Minireq<'a, Stack, Clock, Broker, const SLOTS: usize = 10>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::broker::Broker,
E: core::fmt::Display,
{
machine: sm::StateMachine<MinireqContext<'a, Context, Stack, Clock, Broker, E>>,
machine: sm::StateMachine<MinireqContext<'a, Stack, Clock, Broker, SLOTS>>,
}

impl<'a, Context, Stack, Clock, Broker, E> Minireq<'a, Context, Stack, Clock, Broker, E>
impl<'a, Stack, Clock, Broker, const SLOTS: usize> Minireq<'a, Stack, Clock, Broker, SLOTS>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::broker::Broker,
E: core::fmt::Display,
{
/// Construct a new MQTT request handler.
///
Expand All @@ -185,65 +159,53 @@ where
pub fn new(
device_prefix: &str,
mqtt: minimq::Minimq<'a, Stack, Clock, Broker>,
handlers: &'a mut [HandlerSlot<'a, Context, E>],
) -> Result<Self, Error<Stack::Error>> {
// Note(unwrap): The user must provide a prefix of the correct size.
let mut prefix: String<MAX_TOPIC_LENGTH> = String::new();
write!(&mut prefix, "{}/command", device_prefix).map_err(|_| Error::PrefixTooLong)?;

Ok(Self {
machine: sm::StateMachine::new(MinireqContext {
handlers,
handlers: [None; SLOTS],
mqtt,
prefix,
}),
})
}
}

impl<'a, Context, Stack, Clock, Broker, E> Minireq<'a, Context, Stack, Clock, Broker, E>
impl<'a, Stack, Clock, Broker> Minireq<'a, Stack, Clock, Broker>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::broker::Broker,
E: core::fmt::Display,
{
/// Associate a handler to be called when receiving the specified request.
///
/// # Args
/// * `topic` - The request to register the provided handler with.
/// * `handler` - The handler function to be called when the request occurs.
pub fn register(
&mut self,
topic: &'a str,
handler: Handler<Context, E>,
) -> Result<(), Error<Stack::Error>> {
/// * `topic` - The command topic to subscribe to. This is appended to the string
/// `<prefix>/command/`.
pub fn subscribe(&mut self, topic: &'a str) -> Result<(), Error<Stack::Error>> {
let mut added = false;
for slot in self.machine.context_mut().handlers.iter_mut() {
if let Some((handle, _handler)) = &slot {
if handle == &topic {
return Err(Error::RegisterFailed);
return Err(Error::SubscribeFailed);
}
}

if slot.is_none() {
slot.replace((
topic,
HandlerMeta {
handler,
republished: false,
},
));
slot.replace((topic, HandlerMeta { republished: false }));
added = true;
break;
}
}

if !added {
return Err(Error::RegisterFailed);
return Err(Error::SubscribeFailed);
}

// Force a republish of the newly-registered command after adding it. We ignore failures of
// Force a republish of the newly-subscribed command after adding it. We ignore failures of
// event processing here since that would imply we are adding the handler before we've even
// gotten to the republish state.
self.machine
Expand All @@ -253,9 +215,10 @@ where
Ok(())
}

fn _handle_mqtt<F>(&mut self, mut f: F) -> Result<(), Error<Stack::Error>>
fn _handle_mqtt<E, F>(&mut self, mut f: F) -> Result<(), Error<Stack::Error>>
where
F: FnMut(Handler<Context, E>, &str, &[u8], &mut [u8]) -> Result<usize, E>,
F: FnMut(&str, &[u8], &mut [u8]) -> Result<usize, E>,
E: core::fmt::Display,
{
let MinireqContext {
handlers,
Expand All @@ -272,13 +235,10 @@ where

// For paths, we do not want to include the leading slash.
let path = path.strip_prefix('/').unwrap_or(path);
// Perform the action
let Some(meta) = handlers.iter().find_map(|x| {
x.as_ref().and_then(
|(handle, handler)| if handle == &path { Some(handler) } else { None },
)
}) else {
if let Ok(response) = minimq::Publication::new("No registered handler")

let found_handler = handlers.iter().flatten().any(|handler| handler.0 == path);
if !found_handler {
if let Ok(response) = minimq::Publication::new("Command is not subscribed")
.properties(&[ResponseCode::Error.to_user_property()])
.reply(properties)
.qos(QoS::AtLeastOnce)
Expand All @@ -294,12 +254,11 @@ where
// when attempting transmission.
let props = [ResponseCode::Ok.to_user_property()];

let Ok(response) =
minimq::DeferredPublication::new(|buf| f(meta.handler, path, message, buf))
.reply(properties)
.properties(&props)
.qos(QoS::AtLeastOnce)
.finish()
let Ok(response) = minimq::DeferredPublication::new(|buf| f(path, message, buf))
.reply(properties)
.properties(&props)
.qos(QoS::AtLeastOnce)
.finish()
else {
warn!("No response topic was provided with request: `{}`", path);
return;
Expand Down Expand Up @@ -337,15 +296,12 @@ where
/// Poll the request/response interface.
///
/// # Args
/// * `f` - A function that will be called with the provided handler, command, data, and
/// response buffer. This function is responsible for calling the handler with the necessary
/// context.
///
/// # Note
/// Any incoming requests will be automatically handled using provided handlers.
pub fn poll<F>(&mut self, f: F) -> Result<(), Error<Stack::Error>>
/// * `f` - A function that will be called with the incoming request topic, data, and
/// response buffer. This function is responsible for calling the handler.
pub fn poll<E, F>(&mut self, f: F) -> Result<(), Error<Stack::Error>>
where
F: FnMut(Handler<Context, E>, &str, &[u8], &mut [u8]) -> Result<usize, E>,
F: FnMut(&str, &[u8], &mut [u8]) -> Result<usize, E>,
E: core::fmt::Display,
{
if !self.machine.context_mut().mqtt.client().is_connected() {
// Note(unwrap): It's always safe to unwrap the reset event. All states must handle it.
Expand All @@ -358,25 +314,23 @@ where
}
}

struct MinireqContext<'a, Context, Stack, Clock, Broker, E>
struct MinireqContext<'a, Stack, Clock, Broker, const SLOTS: usize = 10>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::broker::Broker,
E: core::fmt::Display,
{
handlers: &'a mut [HandlerSlot<'a, Context, E>],
handlers: [HandlerSlot<'a>; SLOTS],
mqtt: minimq::Minimq<'a, Stack, Clock, Broker>,
prefix: String<MAX_TOPIC_LENGTH>,
}

impl<'a, Context, Stack, Clock, Broker, E> sm::StateMachineContext
for MinireqContext<'a, Context, Stack, Clock, Broker, E>
impl<'a, Stack, Clock, Broker, const SLOTS: usize> sm::StateMachineContext
for MinireqContext<'a, Stack, Clock, Broker, SLOTS>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::broker::Broker,
E: core::fmt::Display,
{
/// Reset the republish state of all of the handlers.
fn reset(&mut self) {
Expand Down
23 changes: 12 additions & 11 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,23 @@ async fn main() {
);

// Construct a settings configuration interface.
let mut handlers = [None; 10];
let mut interface: minireq::Minireq<bool, _, _, minimq::broker::IpBroker> =
minireq::Minireq::new("minireq/integration/device", mqtt, &mut handlers[..]).unwrap();

interface
.register("test", |exit, _req, _data, _out_buffer| {
*exit = true;
Ok(0)
})
.unwrap();
let mut interface: minireq::Minireq<_, _, minimq::broker::IpBroker> =
minireq::Minireq::new("minireq/integration/device", mqtt).unwrap();

interface.subscribe("test").unwrap();

// Update the client until the exit
let mut should_exit = false;

while !should_exit {
interface
.poll(|handler, topic, data, buffer| handler(&mut should_exit, topic, data, buffer))
.poll(|topic, _data, _buffer| match topic {
"test" => {
should_exit = true;
Ok(0)
}
_ => Err("Unknown handler"),
})
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
Expand Down

0 comments on commit ab28a95

Please sign in to comment.