Skip to content

Commit

Permalink
Finish the implementation with a bit tacked on
Browse files Browse the repository at this point in the history
  • Loading branch information
ssnover committed Dec 20, 2023
1 parent b347579 commit 2764c13
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 74 deletions.
1 change: 1 addition & 0 deletions roslibrust/src/ros1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use node::*;
mod publisher;
pub use publisher::Publisher;
mod service_client;
pub use service_client::ServiceClient;
mod subscriber;
pub use subscriber::Subscriber;
mod tcpros;
8 changes: 4 additions & 4 deletions roslibrust/src/ros1/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ impl Name {
}
}

fn is_valid(name: &str) -> bool {
GRAPH_NAME_REGEX.is_match(name)
}

impl Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}

fn is_valid(name: &str) -> bool {
GRAPH_NAME_REGEX.is_match(name)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
118 changes: 105 additions & 13 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::ros1::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::ServiceServerLink,
subscriber::Subscription,
MasterClient, ProtocolParams,
use crate::{
ros1::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::{CallServiceRequest, ServiceServerLink},
subscriber::Subscription,
MasterClient, ProtocolParams,
},
RosLibRustError, RosLibRustResult,
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use roslibrust_codegen::{RosMessageType, RosServiceType};
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};

Expand Down Expand Up @@ -46,6 +49,13 @@ pub enum NodeMsg {
msg_definition: String,
md5sum: String,
},
RegisterServiceClient {
reply: oneshot::Sender<Result<mpsc::UnboundedSender<CallServiceRequest>, String>>,
service: Name,
service_type: String,
srv_definition: String,
md5sum: String,
},
RequestTopic {
reply: oneshot::Sender<Result<ProtocolParams, String>>,
caller_id: String,
Expand Down Expand Up @@ -161,6 +171,33 @@ impl NodeServerHandle {
}
}

pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> RosLibRustResult<mpsc::UnboundedSender<CallServiceRequest>> {
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender
.send(NodeMsg::RegisterServiceClient {
reply: sender,
service: service_name.to_owned(),
service_type: T::ROS_SERVICE_NAME.to_owned(),
srv_definition: String::from_iter(
[T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(),
),
md5sum: T::MD5SUM.to_owned(),
}) {
Ok(()) => Ok(receiver
.await
.map_err(|_err| RosLibRustError::Disconnected)?
.map_err(|err| {
log::error!("Unable to register service client: {err}");
RosLibRustError::Disconnected
})?),
Err(_err) => Err(RosLibRustError::Disconnected),
}
}

pub async fn register_subscriber<T: RosMessageType>(
&self,
topic: &str,
Expand Down Expand Up @@ -231,14 +268,14 @@ pub(crate) struct Node {
// TODO need signal to shutdown xmlrpc server when node is dropped
host_addr: Ipv4Addr,
hostname: String,
node_name: String,
node_name: Name,
}

impl Node {
pub(crate) async fn new(
master_uri: &str,
hostname: &str,
node_name: &str,
node_name: &Name,
addr: Ipv4Addr,
) -> Result<NodeServerHandle, Box<dyn std::error::Error + Send + Sync>> {
let (node_sender, node_receiver) = mpsc::unbounded_channel();
Expand All @@ -251,9 +288,8 @@ impl Node {
let xmlrpc_server = XmlRpcServer::new(addr, xml_server_handle)?;
let client_uri = format!("http://{hostname}:{}", xmlrpc_server.port());

let _ = Name::new(node_name)?;

let rosmaster_client = MasterClient::new(master_uri, client_uri, node_name).await?;
let rosmaster_client =
MasterClient::new(master_uri, client_uri, node_name.to_string()).await?;
let mut node = Self {
client: rosmaster_client,
_xmlrpc_server: xmlrpc_server,
Expand Down Expand Up @@ -371,6 +407,19 @@ impl Node {
.map_err(|err| err.to_string()),
);
}
NodeMsg::RegisterServiceClient {
reply,
service,
service_type,
srv_definition,
md5sum,
} => {
let _ = reply.send(
self.register_service_client(&service, &service_type, &srv_definition, &md5sum)
.await
.map_err(|err| err.to_string()),
);
}
NodeMsg::RequestTopic {
reply,
topic,
Expand Down Expand Up @@ -491,4 +540,47 @@ impl Node {
Ok(handle)
}
}

async fn register_service_client(
&mut self,
service: &Name,
service_type: &str,
srv_definition: &str,
md5sum: &str,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, Box<dyn std::error::Error>> {
let service_name = service.resolve_to_global(&self.node_name).to_string();
let existing_entry = {
self.service_clients.iter().find_map(|(key, value)| {
if key.as_str() == &service_name {
if value.service_type() == service_type {
Some(Ok(value.get_sender()))
} else {
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
None
}
})
};

if let Some(handle) = existing_entry {
Ok(handle?)
} else {
let service_uri = self.client.lookup_service(&service_name).await?;
let server_link = ServiceServerLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;
let handle = server_link.get_sender();
self.service_clients.insert(service_name, server_link);
Ok(handle)
}
}
}
22 changes: 20 additions & 2 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use super::actor::{Node, NodeServerHandle};
use crate::ros1::{publisher::Publisher, subscriber::Subscriber};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
},
RosLibRustResult,
};

/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc.
/// This class provides the user facing API for interacting with ROS.
Expand All @@ -17,10 +22,11 @@ impl NodeHandle {
master_uri: &str,
name: &str,
) -> Result<NodeHandle, Box<dyn std::error::Error + Send + Sync>> {
let name = Name::new(name)?;
// Follow ROS rules and determine our IP and hostname
let (addr, hostname) = super::determine_addr().await?;

let node = Node::new(master_uri, &hostname, name, addr).await?;
let node = Node::new(master_uri, &hostname, &name, addr).await?;
let nh = NodeHandle { inner: node };

Ok(nh)
Expand Down Expand Up @@ -57,4 +63,16 @@ impl NodeHandle {
.await?;
Ok(Subscriber::new(receiver))
}

pub async fn service_client<T: roslibrust_codegen::RosServiceType>(
&self,
service_name: &str,
) -> RosLibRustResult<ServiceClient<T>> {
let service_name = Name::new(service_name)?;
let sender = self
.inner
.register_service_client::<T>(&service_name)
.await?;
Ok(ServiceClient::new(&service_name, sender))
}
}
9 changes: 6 additions & 3 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{ros1::tcpros::ConnectionHeader, RosLibRustError};
use crate::{
ros1::{names::Name, tcpros::ConnectionHeader},
RosLibRustError,
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use std::{
Expand Down Expand Up @@ -46,7 +49,7 @@ pub struct Publication {

impl Publication {
pub async fn new(
node_name: &str,
node_name: &Name,
latching: bool,
topic_name: &str,
host_addr: Ipv4Addr,
Expand All @@ -62,7 +65,7 @@ impl Publication {
let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(queue_size);

let responding_conn_header = ConnectionHeader {
caller_id: node_name.to_owned(),
caller_id: node_name.to_string(),
latching,
msg_definition: msg_definition.to_owned(),
md5sum: md5sum.to_owned(),
Expand Down
Loading

0 comments on commit 2764c13

Please sign in to comment.