Skip to content

Commit

Permalink
Going whole hog on implementing service servers as well, just so I ca…
Browse files Browse the repository at this point in the history
…n test service clients how I want to
  • Loading branch information
Carter committed Jun 26, 2024
1 parent b183744 commit 2e08c3e
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 9 deletions.
4 changes: 4 additions & 0 deletions assets/ros1_test_msgs/srv/RoundTripArray.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Purpose of this array is send and receive a large payload
uint8[] bytes
---
uint8[] bytes
2 changes: 2 additions & 0 deletions roslibrust/src/ros1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ mod service_client;
pub use service_client::ServiceClient;
mod subscriber;
pub use subscriber::Subscriber;
mod service_server;
pub use service_server::ServiceServer;
mod tcpros;
105 changes: 102 additions & 3 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::{CallServiceRequest, ServiceServerLink},
service_client::{CallServiceRequest, ServiceClientLink},
subscriber::Subscription,
MasterClient, NodeError, ProtocolParams,
},
Expand All @@ -14,6 +14,12 @@ use roslibrust_codegen::{RosMessageType, RosServiceType};
use std::{collections::HashMap, io, net::Ipv4Addr, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};

// Carter TODO:
// I kinda hate this entire Msg based abstraction internal to the server
// Why isn't this just a regular async function call?
// I feel like someone was afraid of deadlocks or didn't know how to mutex safely?
// We should be able to just call the function and get a result back instead of doing
// this odd message passing indirection?
#[derive(Debug)]
pub enum NodeMsg {
GetMasterUri {
Expand Down Expand Up @@ -56,6 +62,13 @@ pub enum NodeMsg {
srv_definition: String,
md5sum: String,
},
RegisterServiceServer {
reply: oneshot::Sender<Result<(), String>>,
service: Name,
service_type: String,
srv_definition: String,
md5sum: String,
},
RequestTopic {
reply: oneshot::Sender<Result<ProtocolParams, String>>,
caller_id: String,
Expand Down Expand Up @@ -120,11 +133,17 @@ impl NodeServerHandle {
.send(NodeMsg::SetPeerPublishers { topic, publishers })?)
}

/// Informs the underlying node server to shutdown
/// This will stop all ROS functionality and poison all NodeHandles connected
/// to the underlying node server.
// TODO this function should probably be pub(crate) and not pub?
pub fn shutdown(&self) -> Result<(), NodeError> {
self.node_server_sender.send(NodeMsg::Shutdown)?;
Ok(())
}

/// Registers a publisher with the underlying node server
/// Returns a channel that the raw bytes of a publish can be shoved into to queue the publish
pub async fn register_publisher<T: RosMessageType>(
&self,
topic: &str,
Expand All @@ -145,6 +164,9 @@ impl NodeServerHandle {
})?)
}

/// Registers a service client with the underlying node server
/// This returns a channel that can be used for making service calls
/// service calls will be queued in the channel and resolved when able.
pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
Expand All @@ -163,18 +185,55 @@ impl NodeServerHandle {
),
md5sum: T::MD5SUM.to_owned(),
})?;
// Get a channel back from the node server for pushing requests into
let received = receiver.await?;
Ok(received.map_err(|err| {
log::error!("Failed to register service client: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub async fn register_service_server<T, F>(
&self,
service_name: &Name,
server: F,
) -> Result<(), NodeError>
where
T: RosServiceType,
F: Fn(T::Request) -> Result<T::Response, Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
{
let (sender, receiver) = oneshot::channel();
self.node_server_sender
.send(NodeMsg::RegisterServiceServer {
reply: sender,
service: service_name.to_owned(),
service_type: T::ROS_SERVICE_NAME.to_owned(),
srv_definition: String::from_iter(
[T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(),
),
md5sum: T::MD5SUM.to_owned(),
})?;
let received = receiver.await?;
Ok(received.map_err(|err| {
log::error!("Failed to register service server: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

/// Registers a subscription with the underlying node server
/// If this is the first time the given topic has been subscribed to (by this node)
/// rosmaster will be informed.
/// Otherwise, a new rx handle will simply be returned to the existing channel.
pub async fn register_subscriber<T: RosMessageType>(
&self,
topic: &str,
queue_size: usize,
) -> Result<broadcast::Receiver<Vec<u8>>, NodeError> {
// Type here is complicated, this is a channel that we're sending a channel receiver over
// This channel is used to fire back the receiver of the underlying subscription
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::RegisterSubscriber {
reply: sender,
Expand All @@ -191,6 +250,11 @@ impl NodeServerHandle {
})?)
}

// This function provides functionality for the Node's XmlRPC server
// When an XmlRpc request for "requestTopic" comes in the xmlrpc server for the node calls this function
// to marshal the response.
// Users can call this function, but it really doesn't serve much of a purpose outside ROS Pub/Sub communication
// negotiation
pub async fn request_topic(
&self,
caller_id: &str,
Expand All @@ -214,6 +278,7 @@ impl NodeServerHandle {

/// Represents a single "real" node, typically only one of these is expected per process
/// but nothing should specifically prevent that.
/// This is sometimes referred to as the NodeServer in the documentation, many NodeHandles can point to one NodeServer
pub(crate) struct Node {
// The xmlrpc client this node uses to make requests to master
client: MasterClient,
Expand All @@ -226,7 +291,9 @@ pub(crate) struct Node {
// Record of subscriptions this node has
subscriptions: HashMap<String, Subscription>,
// Map of topic names to the service client handles for each topic
service_clients: HashMap<String, ServiceServerLink>,
service_clients: HashMap<String, ServiceClientLink>,
// Map of topic names to service server handles for each topic
service_servers: HashMap<String, ServiceServerLink>,
// TODO need signal to shutdown xmlrpc server when node is dropped
host_addr: Ipv4Addr,
hostname: String,
Expand Down Expand Up @@ -382,6 +449,19 @@ impl Node {
.map_err(|err| err.to_string()),
);
}
NodeMsg::RegisterServiceServer {
reply,
service,
service_type,
srv_definition,
md5sum,
} => {
let _ = reply.send(
self.register_service_server(&service, &service_type, &srv_definition, &md5sum)
.await
.map_err(|err| err.to_string()),
);
}
NodeMsg::RequestTopic {
reply,
topic,
Expand Down Expand Up @@ -503,6 +583,9 @@ impl Node {
}
}

/// Checks the internal state of the NodeServer to see if it has a service client registered for this service already
/// If it does, it returns a Sender to the existing service client
/// Otherwise, it creates a new service client and returns a Sender to the new service client
async fn register_service_client(
&mut self,
service: &Name,
Expand Down Expand Up @@ -536,7 +619,7 @@ impl Node {
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;
log::debug!("Found service at {service_uri}");
let server_link = ServiceServerLink::new(
let server_link = ServiceClientLink::new(
&self.node_name,
&service_name,
service_type,
Expand All @@ -551,4 +634,20 @@ impl Node {
Ok(handle)
}
}

async fn register_service_server<F>(
&mut self,
service: &Name,
service_type: &str,
srv_definition: &str,
md5sum: &str,
) -> Result<(), Box<dyn std::error::Error>>
where
T: RosServiceType,
F: Fn(T::Request) -> Result<T::Response, Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
{
}
}
22 changes: 21 additions & 1 deletion roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError,
NodeError, ServiceServer,
},
RosLibRustResult,
};
Expand Down Expand Up @@ -84,4 +84,24 @@ impl NodeHandle {
.await?;
Ok(ServiceClient::new(&service_name, sender))
}

pub async fn advertise_service<T, F>(
&self,
service_name: &str,
server: F,
) -> Result<ServiceServer, NodeError>
where
T: roslibrust_codegen::RosServiceType,
F: Fn(T::Request) -> Result<T::Response, Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
{
let service_name = Name::new(service_name)?;
let _response = self
.inner
.register_service_server::<T, F>(&service_name, server)
.await?;
Ok(ServiceServer::new(service_name, self.clone()))
}
}
61 changes: 59 additions & 2 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ impl<T: RosServiceType> ServiceClient<T> {
}
}

pub struct ServiceServerLink {
pub struct ServiceClientLink {
service_type: String,
call_sender: mpsc::UnboundedSender<CallServiceRequest>,
_actor_task: ChildTask<()>,
}

impl ServiceServerLink {
impl ServiceClientLink {
pub async fn new(
node_name: &Name,
service_name: &str,
Expand Down Expand Up @@ -184,3 +184,60 @@ impl ServiceServerLink {
}
}
}

#[cfg(feature = "ros1_test")]
#[cfg(test)]
mod test {
use log::info;

use crate::{
ros1::{NodeError, NodeHandle},
RosLibRustError,
};

roslibrust_codegen_macro::find_and_generate_ros_messages!(
"assets/ros1_test_msgs",
"assets/ros1_common_interfaces"
);

// Some logic in the service client specifically for handling large payloads
// trying to intentionally exercise that path
#[test_log::test(tokio::test)]
async fn test_large_service_payload_client() {
let nh = NodeHandle::new(
"http://localhost:11311",
"test_large_service_payload_client",
)
.await
.unwrap();

info!("Starting service call");
let response = nh
.service_client::<test_msgs::RoundTripArray>("large_service_payload")
.await
.unwrap()
.call(&test_msgs::RoundTripArrayRequest {
bytes: vec![0; 1000000],
})
.await
.unwrap();
info!("Service call complete");
}

#[test_log::test(tokio::test)]
async fn error_on_unprovided_service() {
let nh = NodeHandle::new("http://localhost:11311", "error_on_unprovided_service")
.await
.unwrap();

let client = nh
.service_client::<test_msgs::RoundTripArray>("unprovided_service")
.await;
assert!(client.is_err());
// Note / TODO: this currently returns an IoError(Kind(ConnectionAborted))
// which is better than hanging, but not a good error type to return
if !matches!(client, Err(NodeError::IoError(_))) {
panic!("Unexpected error type");
}
}
}
30 changes: 30 additions & 0 deletions roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::{names::Name, NodeHandle};

// TODO: someday I'd like to define a trait alias here for a ServerFunction
// Currently unstable:
// https://doc.rust-lang.org/beta/unstable-book/language-features/trait-alias.html
// trait ServerFunction<T> = Fn(T::Request) -> Err(T::Response, Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static;

/// ServiceServer is simply a lifetime control
/// The underlying ServiceServer is kept alive while object is kept alive.
/// Dropping this object, un-advertises the underlying service with rosmaster
pub struct ServiceServer {
service_name: Name,
node_handle: NodeHandle,
}

impl ServiceServer {
pub fn new(service_name: Name, node_handle: NodeHandle) -> Self {
Self {
service_name,
node_handle,
}
}
}

impl Drop for ServiceServer {
fn drop(&mut self) {
self.node_handle
.unadvertise_service(&self.service_name.to_string());
}
}
12 changes: 9 additions & 3 deletions roslibrust_codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,15 @@ pub fn resolve_dependency_graph(
// Now that all messages are parsed, we can parse and resolve services
let mut resolved_services: Vec<_> = services
.into_iter()
.filter_map(|srv| ServiceFile::resolve(srv, &resolved_messages))
.collect();
resolved_services.sort_by(|a, b| a.parsed.name.cmp(&b.parsed.name));
.map(|srv| {
let name = srv.path.clone();
ServiceFile::resolve(srv, &resolved_messages).ok_or(Error::new(format!(
"Failed to correctly resolve service: {:?}",
&name
)))
})
.collect::<Result<Vec<_>, Error>>()?;
resolved_services.sort_by(|a: &ServiceFile, b: &ServiceFile| a.parsed.name.cmp(&b.parsed.name));

Ok((resolved_messages.into_values().collect(), resolved_services))
}
Expand Down

0 comments on commit 2e08c3e

Please sign in to comment.