Skip to content

Commit

Permalink
Merge pull request #151 from your-sudden-death/native-better-errors
Browse files Browse the repository at this point in the history
Better Error Handling for Native, First Steps
  • Loading branch information
Carter12s authored May 21, 2024
2 parents db419d5 + 30e064b commit 7b66397
Show file tree
Hide file tree
Showing 18 changed files with 175 additions and 140 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Code generated by roslibrust_codegen now depends only on libraries exposed by roslibrust_codegen. This means that
crates that were previously adding dependencies on serde, serde-big-array, and smart-default will no longer need to do so.
- A significant reworking of the error types in the ROS1 native client was performed to move away from the `Box<dyn Error + Send + Sync>` pattern and instead use the `anyhow` crate.

## 0.9.0 - May 13th, 2023

Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/basic_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in
/// pass messages between each other.
/// To run this example a rosbridge websocket server should be running at the deafult port (9090).
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.without_timestamps() // required for running wsl2
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/calling_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in
/// As well as the rosapi node.
/// This node calls a service on the rosapi node to get the current ros time.
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.without_timestamps() // Required for running in wsl2
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/generic_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl RosMessageType for GenericHeader {

/// Sets up a subscriber that could get either of two versions of a message
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.without_timestamps() // required for running in wsl2
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/native_ros1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
Expand Down
8 changes: 5 additions & 3 deletions roslibrust/examples/ros1_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in

#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
Expand All @@ -14,8 +14,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let nh = NodeHandle::new("http://localhost:11311", "listener_rs").await?;
let mut subscriber = nh.subscribe::<std_msgs::String>("/chatter", 1).await?;

while let Ok(msg) = subscriber.next().await {
log::info!("[/listener_rs] Got message: {}", msg.data);
while let Some(msg) = subscriber.next().await {
if let Ok(msg) = msg {
log::info!("[/listener_rs] Got message: {}", msg.data);
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/ros1_ros2_bridge_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod ros2 {
/// With this example running you should then be able to use the ros1 command line tools to publish a message,
/// and see them appear in ros2 with its command line tools
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
// Initialize a basic logger useful if anything goes wrong while running the example
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/ros1_talker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in

#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn my_service(
/// To actually exercise our service we need to call it with something.
/// One option would be to use a ros commaline tool: `rosservice call /my_set_bool "data: false"` or `ros2 service call /my_set_bool std_srvs/srv/SetBool data:\ false\ `.
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
// Initialize a basic logger useful if anything goes wrong while running the example
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/subscribe_and_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in
/// This example will log received messages if run at the same time as "basic_publisher".
/// A running rosbridge websocket server at the default port (9090) is required to run this example.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn main() -> Result<(), anyhow::Error> {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.without_timestamps() // required for running in wsl2
Expand Down
9 changes: 6 additions & 3 deletions roslibrust/src/ros1/names.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::{RosLibRustError, RosLibRustResult};
use std::fmt::Display;

lazy_static::lazy_static! {
Expand All @@ -11,12 +10,12 @@ pub struct Name {
}

impl Name {
pub fn new(name: impl Into<String>) -> RosLibRustResult<Self> {
pub fn new(name: impl Into<String>) -> Result<Name, InvalidNameError> {
let name: String = name.into();
if is_valid(&name) {
Ok(Self { inner: name })
} else {
Err(RosLibRustError::InvalidName(name))
Err(InvalidNameError(name))
}
}

Expand Down Expand Up @@ -126,3 +125,7 @@ mod tests {
);
}
}

#[derive(thiserror::Error, Debug)]
#[error("Invalid Name: {0}")]
pub struct InvalidNameError(String);
136 changes: 50 additions & 86 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::ProtocolParams;
use super::{NodeError, ProtocolParams};
use crate::{
ros1::{
names::Name,
Expand All @@ -11,7 +11,7 @@ use crate::{
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc};
use std::{collections::HashMap, io, net::Ipv4Addr, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};

#[derive(Debug)]
Expand Down Expand Up @@ -68,56 +68,37 @@ pub(crate) struct NodeServerHandle {

impl NodeServerHandle {
/// Get the URI of the master node.
pub async fn get_master_uri(&self) -> Result<String, Box<dyn std::error::Error>> {
pub async fn get_master_uri(&self) -> Result<String, NodeError> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::GetMasterUri { reply: sender })
{
Ok(()) => Ok(receiver.await.map_err(|err| Box::new(err))?),
Err(e) => Err(Box::new(e)),
}
self.node_server_sender
.send(NodeMsg::GetMasterUri { reply: sender })?;
Ok(receiver.await?)
}

pub async fn get_client_uri(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
/// Get the URI of the client node.
pub async fn get_client_uri(&self) -> Result<String, NodeError> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::GetClientUri { reply: sender })
{
Ok(()) => Ok(receiver.await.map_err(|err| Box::new(err))?),
Err(e) => Err(Box::new(e)),
}
self.node_server_sender
.send(NodeMsg::GetClientUri { reply: sender })?;
Ok(receiver.await?)
}

/// Gets the list of topics the node is currently subscribed to.
/// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log").
pub async fn get_subscriptions(
&self,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
pub async fn get_subscriptions(&self) -> Result<Vec<(String, String)>, NodeError> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::GetSubscriptions { reply: sender })
{
Ok(()) => Ok(receiver.await.map_err(|err| Box::new(err))?),
Err(e) => Err(Box::new(e)),
}
self.node_server_sender
.send(NodeMsg::GetSubscriptions { reply: sender })?;
Ok(receiver.await?)
}

/// Gets the list of topic the node is currently publishing to.
/// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log").
pub async fn get_publications(
&self,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
pub async fn get_publications(&self) -> Result<Vec<(String, String)>, NodeError> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::GetPublications { reply: sender })
{
Ok(()) => Ok(receiver.await.map_err(|err| Box::new(err))?),
Err(e) => Err(Box::new(e)),
}
self.node_server_sender
.send(NodeMsg::GetPublications { reply: sender })?;
Ok(receiver.await?)
}

/// Updates the list of know publishers for a given topic
Expand All @@ -126,93 +107,76 @@ impl NodeServerHandle {
&self,
topic: String,
publishers: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), NodeError> {
Ok(self
.node_server_sender
.send(NodeMsg::SetPeerPublishers { topic, publishers })
.map_err(|err| Box::new(err))?)
.send(NodeMsg::SetPeerPublishers { topic, publishers })?)
}

pub fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> {
self.node_server_sender
.send(NodeMsg::Shutdown)
.map_err(|err| Box::new(err))?;
pub fn shutdown(&self) -> Result<(), NodeError> {
self.node_server_sender.send(NodeMsg::Shutdown)?;
Ok(())
}

pub async fn register_publisher<T: RosMessageType>(
&self,
topic: &str,
queue_size: usize,
) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
let (sender, receiver) = oneshot::channel();
match self.node_server_sender.send(NodeMsg::RegisterPublisher {
self.node_server_sender.send(NodeMsg::RegisterPublisher {
reply: sender,
topic: topic.to_owned(),
topic_type: T::ROS_TYPE_NAME.to_owned(),
queue_size,
msg_definition: T::DEFINITION.to_owned(),
md5sum: T::MD5SUM.to_owned(),
}) {
Ok(()) => {
let received = receiver.await.map_err(|err| Box::new(err))?;
Ok(received.map_err(|_err| {
Box::new(std::io::Error::from(std::io::ErrorKind::ConnectionAborted))
})?)
}
Err(err) => Err(Box::new(err)),
}
})?;
let received = receiver.await?;
Ok(received.map_err(|_err| {
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub async fn register_subscriber<T: RosMessageType>(
&self,
topic: &str,
queue_size: usize,
) -> Result<broadcast::Receiver<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<broadcast::Receiver<Vec<u8>>, NodeError> {
let (sender, receiver) = oneshot::channel();
match self.node_server_sender.send(NodeMsg::RegisterSubscriber {
self.node_server_sender.send(NodeMsg::RegisterSubscriber {
reply: sender,
topic: topic.to_owned(),
topic_type: T::ROS_TYPE_NAME.to_owned(),
queue_size,
msg_definition: T::DEFINITION.to_owned(),
md5sum: T::MD5SUM.to_owned(),
}) {
Ok(()) => {
let received = receiver.await.map_err(|err| Box::new(err))?;
Ok(received.map_err(|err| {
log::error!("Failed to register subscriber: {err}");
Box::new(std::io::Error::from(std::io::ErrorKind::ConnectionAborted))
})?)
}
Err(err) => Err(Box::new(err)),
}
})?;
let received = receiver.await?;
Ok(received.map_err(|err| {
log::error!("Failed to register subscriber: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub async fn request_topic(
&self,
caller_id: &str,
topic: &str,
protocols: &[String],
) -> Result<ProtocolParams, Box<dyn std::error::Error>> {
) -> Result<ProtocolParams, NodeError> {
let (sender, receiver) = oneshot::channel();
match self.node_server_sender.send(NodeMsg::RequestTopic {
self.node_server_sender.send(NodeMsg::RequestTopic {
caller_id: caller_id.to_owned(),
topic: topic.to_owned(),
protocols: protocols.into(),
reply: sender,
}) {
Ok(()) => {
let received = receiver.await.map_err(|err| Box::new(err))?;
Ok(received.map_err(|err| {
log::error!(
"Fail to coordinate channel between publisher and subscriber: {err}"
);
Box::new(std::io::Error::from(std::io::ErrorKind::AddrNotAvailable))
})?)
}
Err(e) => Err(Box::new(e)),
}
})?;
let received = receiver.await?;
Ok(received.map_err(|err| {
log::error!("Fail to coordinate channel between publisher and subscriber: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}
}

Expand Down Expand Up @@ -243,7 +207,7 @@ impl Node {
hostname: &str,
node_name: &str,
addr: Ipv4Addr,
) -> Result<NodeServerHandle, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<NodeServerHandle, NodeError> {
let (node_sender, node_receiver) = mpsc::unbounded_channel();
let xml_server_handle = NodeServerHandle {
node_server_sender: node_sender.clone(),
Expand Down Expand Up @@ -421,7 +385,7 @@ impl Node {
queue_size: usize,
msg_definition: &str,
md5sum: &str,
) -> Result<broadcast::Receiver<Vec<u8>>, Box<dyn std::error::Error>> {
) -> Result<broadcast::Receiver<Vec<u8>>, NodeError> {
match self.subscriptions.iter().find(|(key, _)| *key == topic) {
Some((_topic, subscription)) => Ok(subscription.get_receiver()),
None => {
Expand Down Expand Up @@ -453,14 +417,14 @@ impl Node {
queue_size: usize,
msg_definition: String,
md5sum: String,
) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error>> {
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
let existing_entry = {
self.publishers.iter().find_map(|(key, value)| {
if key.as_str() == &topic {
if value.topic_type() == topic_type {
Some(Ok(value.get_sender()))
} else {
Some(Err(Box::new(std::io::Error::from(
Some(Err(NodeError::IoError(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
Expand Down
Loading

0 comments on commit 7b66397

Please sign in to comment.