Skip to content

Commit

Permalink
Finish the implementation with a bit tacked on
Browse files Browse the repository at this point in the history
  • Loading branch information
ssnover committed Dec 19, 2023
1 parent 9dd8f34 commit f184540
Show file tree
Hide file tree
Showing 17 changed files with 287 additions and 122 deletions.
2 changes: 1 addition & 1 deletion roslibrust/examples/native_ros1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use roslibrust::NodeHandle;
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/ros1_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in
#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use roslibrust::NodeHandle;
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/examples/ros1_talker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in
#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use roslibrust::NodeHandle;
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
Expand Down
37 changes: 34 additions & 3 deletions roslibrust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,37 @@ pub use rosbridge::*;
pub mod rosapi;

#[cfg(feature = "ros1")]
mod ros1;
#[cfg(feature = "ros1")]
pub use ros1::*;
pub mod ros1;

/// For now starting with a central error type, may break this up more in future
#[derive(thiserror::Error, Debug)]
pub enum RosLibRustError {
#[error("Not currently connected to ros master / bridge")]
Disconnected,
// TODO we probably want to eliminate tungstenite from this and hide our
// underlying websocket implementation from the API
// currently we "technically" break the API when we change tungstenite verisons
#[error("Websocket communication error: {0}")]
CommFailure(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Operation timed out: {0}")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Failed to parse message from JSON: {0}")]
InvalidMessage(#[from] serde_json::Error),
#[error("TCPROS serialization error: {0}")]
SerializationError(String),
#[error("Rosbridge server reported an error: {0}")]
ServerError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Name does not meet ROS requirements: {0}")]
InvalidName(String),
// Generic catch-all error type for not-yet-handled errors
// TODO ultimately this type will be removed from API of library
#[error(transparent)]
Unexpected(#[from] anyhow::Error),
}

/// Generic result type used as standard throughout library.
/// Note: many functions which currently return this will be updated to provide specific error
/// types in the future instead of the generic error here.
pub type RosLibRustResult<T> = Result<T, RosLibRustError>;
3 changes: 3 additions & 0 deletions roslibrust/src/ros1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ mod node;
pub use node::*;

mod publisher;
pub use publisher::Publisher;
mod service_client;
pub use service_client::ServiceClient;
mod subscriber;
pub use subscriber::Subscriber;
mod tcpros;
16 changes: 13 additions & 3 deletions roslibrust/src/ros1/names.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::fmt::Display;

use crate::{RosLibRustError, RosLibRustResult};

lazy_static::lazy_static! {
static ref GRAPH_NAME_REGEX: regex::Regex = regex::Regex::new(r"^([/~a-zA-Z]){1}([a-zA-Z0-9_/])*([A-z0-9_])$").unwrap();
}
Expand All @@ -8,12 +12,12 @@ pub struct Name {
}

impl Name {
pub fn new(name: impl Into<String>) -> Option<Self> {
pub fn new(name: impl Into<String>) -> RosLibRustResult<Self> {
let name: String = name.into();
if is_valid(&name) {
Some(Self { inner: name })
Ok(Self { inner: name })
} else {
None
Err(RosLibRustError::InvalidName(name))
}
}

Expand Down Expand Up @@ -50,6 +54,12 @@ impl Name {
}
}

impl Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}

fn is_valid(name: &str) -> bool {
GRAPH_NAME_REGEX.is_match(name)
}
Expand Down
114 changes: 99 additions & 15 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use crate::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::ServiceServerLink,
service_client::{CallServiceRequest, ServiceServerLink},
subscriber::Subscription,
MasterClient,
},
MasterClient,
RosLibRustError, RosLibRustResult,
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use roslibrust_codegen::{RosMessageType, RosServiceType};
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};

Expand Down Expand Up @@ -49,6 +50,13 @@ pub enum NodeMsg {
msg_definition: String,
md5sum: String,
},
RegisterServiceClient {
reply: oneshot::Sender<Result<mpsc::UnboundedSender<CallServiceRequest>, String>>,
service: Name,
service_type: String,
srv_definition: String,
md5sum: String,
},
RequestTopic {
reply: oneshot::Sender<Result<ProtocolParams, String>>,
caller_id: String,
Expand Down Expand Up @@ -143,14 +151,13 @@ impl NodeServerHandle {
pub async fn register_publisher<T: RosMessageType>(
&self,
topic: &str,
topic_type: &str,
queue_size: usize,
) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
let (sender, receiver) = oneshot::channel();
match self.node_server_sender.send(NodeMsg::RegisterPublisher {
reply: sender,
topic: topic.to_owned(),
topic_type: topic_type.to_owned(),
topic_type: T::ROS_TYPE_NAME.to_owned(),
queue_size,
msg_definition: T::DEFINITION.to_owned(),
md5sum: T::MD5SUM.to_owned(),
Expand All @@ -165,6 +172,33 @@ impl NodeServerHandle {
}
}

pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> RosLibRustResult<mpsc::UnboundedSender<CallServiceRequest>> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::RegisterServiceClient {
reply: sender,
service: service_name.to_owned(),
service_type: T::ROS_SERVICE_NAME.to_owned(),
srv_definition: String::from_iter(
[T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(),
),
md5sum: T::MD5SUM.to_owned(),
}) {
Ok(()) => Ok(receiver
.await
.map_err(|_err| RosLibRustError::Disconnected)?
.map_err(|err| {
log::error!("Unable to register service client: {err}");
RosLibRustError::Disconnected
})?),
Err(_err) => Err(RosLibRustError::Disconnected),
}
}

pub async fn register_subscriber<T: RosMessageType>(
&self,
topic: &str,
Expand Down Expand Up @@ -235,14 +269,14 @@ pub(crate) struct Node {
// TODO need signal to shutdown xmlrpc server when node is dropped
host_addr: Ipv4Addr,
hostname: String,
node_name: String,
node_name: Name,
}

impl Node {
pub(crate) async fn new(
master_uri: &str,
hostname: &str,
node_name: &str,
node_name: &Name,
addr: Ipv4Addr,
) -> Result<NodeServerHandle, Box<dyn std::error::Error + Send + Sync>> {
let (node_sender, node_receiver) = mpsc::unbounded_channel();
Expand All @@ -255,14 +289,8 @@ impl Node {
let xmlrpc_server = XmlRpcServer::new(addr, xml_server_handle)?;
let client_uri = format!("http://{hostname}:{}", xmlrpc_server.port());

if let None = Name::new(node_name) {
log::error!("Node name {node_name} is not valid");
return Err(Box::new(std::io::Error::from(
std::io::ErrorKind::InvalidInput,
)));
}

let rosmaster_client = MasterClient::new(master_uri, client_uri, node_name).await?;
let rosmaster_client =
MasterClient::new(master_uri, client_uri, node_name.to_string()).await?;
let mut node = Self {
client: rosmaster_client,
_xmlrpc_server: xmlrpc_server,
Expand Down Expand Up @@ -380,6 +408,19 @@ impl Node {
.map_err(|err| err.to_string()),
);
}
NodeMsg::RegisterServiceClient {
reply,
service,
service_type,
srv_definition,
md5sum,
} => {
let _ = reply.send(
self.register_service_client(&service, &service_type, &srv_definition, &md5sum)
.await
.map_err(|err| err.to_string()),
);
}
NodeMsg::RequestTopic {
reply,
topic,
Expand Down Expand Up @@ -500,4 +541,47 @@ impl Node {
Ok(handle)
}
}

async fn register_service_client(
&mut self,
service: &Name,
service_type: &str,
srv_definition: &str,
md5sum: &str,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, Box<dyn std::error::Error>> {
let service_name = service.resolve_to_global(&self.node_name).to_string();
let existing_entry = {
self.service_clients.iter().find_map(|(key, value)| {
if key.as_str() == &service_name {
if value.service_type() == service_type {
Some(Ok(value.get_sender()))
} else {
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
None
}
})
};

if let Some(handle) = existing_entry {
Ok(handle?)
} else {
let service_uri = self.client.lookup_service(&service_name).await?;
let server_link = ServiceServerLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;
let handle = server_link.get_sender();
self.service_clients.insert(service_name, server_link);
Ok(handle)
}
}
}
24 changes: 21 additions & 3 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use super::actor::{Node, NodeServerHandle};
use crate::ros1::{publisher::Publisher, subscriber::Subscriber};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
},
RosLibRustResult,
};

/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc.
/// This class provides the user facing API for interacting with ROS.
Expand All @@ -17,10 +22,11 @@ impl NodeHandle {
master_uri: &str,
name: &str,
) -> Result<NodeHandle, Box<dyn std::error::Error + Send + Sync>> {
let name = Name::new(name)?;
// Follow ROS rules and determine our IP and hostname
let (addr, hostname) = super::determine_addr().await?;

let node = Node::new(master_uri, &hostname, name, addr).await?;
let node = Node::new(master_uri, &hostname, &name, addr).await?;
let nh = NodeHandle { inner: node };

Ok(nh)
Expand All @@ -41,7 +47,7 @@ impl NodeHandle {
) -> Result<Publisher<T>, Box<dyn std::error::Error + Send + Sync>> {
let sender = self
.inner
.register_publisher::<T>(topic_name, T::ROS_TYPE_NAME, queue_size)
.register_publisher::<T>(topic_name, queue_size)
.await?;
Ok(Publisher::new(topic_name, sender))
}
Expand All @@ -57,4 +63,16 @@ impl NodeHandle {
.await?;
Ok(Subscriber::new(receiver))
}

pub async fn service_client<T: roslibrust_codegen::RosServiceType>(
&self,
service_name: &str,
) -> RosLibRustResult<ServiceClient<T>> {
let service_name = Name::new(service_name)?;
let sender = self
.inner
.register_service_client::<T>(&service_name)
.await?;
Ok(ServiceClient::new(&service_name, sender))
}
}
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the top level Node and NodeHandle classes.
//! These wrap the lower level management of a ROS Node connection into a higher level and thread safe API.
use crate::RosMasterError;
use crate::ros1::RosMasterError;
use std::net::{IpAddr, Ipv4Addr};

mod actor;
Expand Down
11 changes: 6 additions & 5 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::RosLibRustError;

use super::tcpros::ConnectionHeader;
use crate::{
ros1::{names::Name, tcpros::ConnectionHeader},
RosLibRustError,
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use std::{
Expand Down Expand Up @@ -48,7 +49,7 @@ pub struct Publication {

impl Publication {
pub async fn new(
node_name: &str,
node_name: &Name,
latching: bool,
topic_name: &str,
host_addr: Ipv4Addr,
Expand All @@ -64,7 +65,7 @@ impl Publication {
let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(queue_size);

let responding_conn_header = ConnectionHeader {
caller_id: node_name.to_owned(),
caller_id: node_name.to_string(),
latching,
msg_definition: msg_definition.to_owned(),
md5sum: md5sum.to_owned(),
Expand Down
Loading

0 comments on commit f184540

Please sign in to comment.