Skip to content

Commit

Permalink
ROS1 publishers call unadvertise on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed Aug 4, 2024
1 parent dea65ab commit ed84f6b
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 59 deletions.
2 changes: 1 addition & 1 deletion roslibrust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ rosapi = ["serde-big-array"]
# Intended for use with tests, includes tests that rely on a locally running rosbridge
running_bridge = []
# For use with integration tests, indicating we are testing integration with a ros1 bridge
ros1_test = ["running_bridge"]
ros1_test = ["running_bridge", "ros1"]
# For use with integration tests, indicates we are testing integration with a ros2 bridge
ros2_test = ["running_bridge"]
# Provides access to experimental abstract trait topic_provider
Expand Down
135 changes: 94 additions & 41 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
RosLibRustError,
};
use abort_on_drop::ChildTask;
use log::warn;
use log::*;
use roslibrust_codegen::{RosMessageType, RosServiceType};
use std::{collections::HashMap, io, net::Ipv4Addr, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};
Expand Down Expand Up @@ -82,10 +82,13 @@ pub enum NodeMsg {
},
RequestTopic {
reply: oneshot::Sender<Result<ProtocolParams, String>>,
caller_id: String,
topic: String,
protocols: Vec<String>,
},
UnregisterPublisher {
reply: oneshot::Sender<Result<(), String>>,
topic: String,
},
}

#[derive(Clone)]
Expand All @@ -99,15 +102,15 @@ pub(crate) struct NodeServerHandle {

impl NodeServerHandle {
/// Get the URI of the master node.
pub async fn get_master_uri(&self) -> Result<String, NodeError> {
pub(crate) async fn get_master_uri(&self) -> Result<String, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender
.send(NodeMsg::GetMasterUri { reply: sender })?;
Ok(receiver.await?)
}

/// Get the URI of the client node.
pub async fn get_client_uri(&self) -> Result<String, NodeError> {
pub(crate) async fn get_client_uri(&self) -> Result<String, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender
.send(NodeMsg::GetClientUri { reply: sender })?;
Expand All @@ -116,7 +119,7 @@ impl NodeServerHandle {

/// Gets the list of topics the node is currently subscribed to.
/// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log").
pub async fn get_subscriptions(&self) -> Result<Vec<(String, String)>, NodeError> {
pub(crate) async fn get_subscriptions(&self) -> Result<Vec<(String, String)>, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender
.send(NodeMsg::GetSubscriptions { reply: sender })?;
Expand All @@ -125,7 +128,7 @@ impl NodeServerHandle {

/// Gets the list of topic the node is currently publishing to.
/// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log").
pub async fn get_publications(&self) -> Result<Vec<(String, String)>, NodeError> {
pub(crate) async fn get_publications(&self) -> Result<Vec<(String, String)>, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender
.send(NodeMsg::GetPublications { reply: sender })?;
Expand All @@ -134,7 +137,7 @@ impl NodeServerHandle {

/// Updates the list of know publishers for a given topic
/// This is used to know who to reach out to for updates
pub fn set_peer_publishers(
pub(crate) fn set_peer_publishers(
&self,
topic: String,
publishers: Vec<String>,
Expand All @@ -148,14 +151,14 @@ impl NodeServerHandle {
/// This will stop all ROS functionality and poison all NodeHandles connected
/// to the underlying node server.
// TODO this function should probably be pub(crate) and not pub?
pub fn shutdown(&self) -> Result<(), NodeError> {
pub(crate) fn shutdown(&self) -> Result<(), NodeError> {
self.node_server_sender.send(NodeMsg::Shutdown)?;
Ok(())
}

/// Registers a publisher with the underlying node server
/// Returns a channel that the raw bytes of a publish can be shoved into to queue the publish
pub async fn register_publisher<T: RosMessageType>(
pub(crate) async fn register_publisher<T: RosMessageType>(
&self,
topic: &str,
queue_size: usize,
Expand All @@ -177,10 +180,23 @@ impl NodeServerHandle {
})?)
}

pub(crate) async fn unregister_publisher(&self, topic: &str) -> Result<(), NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::UnregisterPublisher {
reply: sender,
topic: topic.to_owned(),
})?;
let rx = receiver.await?;
rx.map_err(|err| {
warn!("Failure while unregistering publisher: {err:?}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})
}

/// Registers a service client with the underlying node server
/// This returns a channel that can be used for making service calls
/// service calls will be queued in the channel and resolved when able.
pub async fn register_service_client<T: RosServiceType>(
pub(crate) async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> Result<ServiceClient<T>, NodeError> {
Expand Down Expand Up @@ -209,7 +225,7 @@ impl NodeServerHandle {
Ok(ServiceClient::new(service_name, sender, link))
}

pub async fn register_service_server<T, F>(
pub(crate) async fn register_service_server<T, F>(
&self,
service_name: &Name,
server: F,
Expand Down Expand Up @@ -256,7 +272,7 @@ impl NodeServerHandle {

/// Called to remove a service server
/// Delegates to the NodeServer via channel
pub async fn unadvertise_service(&self, service_name: &str) -> Result<(), NodeError> {
pub(crate) async fn unadvertise_service(&self, service_name: &str) -> Result<(), NodeError> {
let (tx, rx) = oneshot::channel();
log::debug!("Queuing unregister service server command for: {service_name:?}");
self.node_server_sender
Expand All @@ -274,7 +290,7 @@ impl NodeServerHandle {
/// If this is the first time the given topic has been subscribed to (by this node)
/// rosmaster will be informed.
/// Otherwise, a new rx handle will simply be returned to the existing channel.
pub async fn register_subscriber<T: RosMessageType>(
pub(crate) async fn register_subscriber<T: RosMessageType>(
&self,
topic: &str,
queue_size: usize,
Expand Down Expand Up @@ -302,15 +318,13 @@ impl NodeServerHandle {
// to marshal the response.
// Users can call this function, but it really doesn't serve much of a purpose outside ROS Pub/Sub communication
// negotiation
pub async fn request_topic(
pub(crate) async fn request_topic(
&self,
caller_id: &str,
topic: &str,
protocols: &[String],
) -> Result<ProtocolParams, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::RequestTopic {
caller_id: caller_id.to_owned(),
topic: topic.to_owned(),
protocols: protocols.into(),
reply: sender,
Expand Down Expand Up @@ -346,10 +360,12 @@ pub(crate) struct Node {
// service_clients: HashMap<String, ServiceClientLink>,
// Map of topic names to service server handles for each topic
service_servers: HashMap<String, ServiceServerLink>,
// TODO need signal to shutdown xmlrpc server when node is dropped
// TODO MAJOR: need signal to shutdown xmlrpc server when node is dropped
host_addr: Ipv4Addr,
hostname: String,
node_name: Name,
// Store a handle to ourself so that we can pass it out later
node_handle: NodeServerHandle,
}

impl Node {
Expand All @@ -371,6 +387,10 @@ impl Node {

let rosmaster_client =
MasterClient::new(master_uri, client_uri, node_name.to_string()).await?;
let weak_handle = NodeServerHandle {
node_server_sender: node_sender.clone(),
_node_task: None,
};
let mut node = Self {
client: rosmaster_client,
_xmlrpc_server: xmlrpc_server,
Expand All @@ -381,6 +401,7 @@ impl Node {
host_addr: addr,
hostname: hostname.to_owned(),
node_name: node_name.to_owned(),
node_handle: weak_handle,
};

let t = Arc::new(
Expand Down Expand Up @@ -476,6 +497,13 @@ impl Node {
}
.expect("Failed to reply on oneshot");
}
NodeMsg::UnregisterPublisher { reply, topic } => {
let _ = reply.send(
self.unregister_publisher(&topic)
.await
.map_err(|err| err.to_string()),
);
}
NodeMsg::RegisterSubscriber {
reply,
topic,
Expand Down Expand Up @@ -543,7 +571,6 @@ impl Node {
reply,
topic,
protocols,
..
} => {
// TODO: Should move the actual implementation similar to RegisterPublisher
if protocols
Expand Down Expand Up @@ -625,8 +652,16 @@ impl Node {
self.publishers.iter().find_map(|(key, value)| {
if key.as_str() == &topic {
if value.topic_type() == topic_type {
Some(Ok(value.get_sender()))
if let Some(sender) = value.get_sender() {
return Some(Ok(sender));
}else{
// Edge case here
// The channel for the publication is closed, but publication hasn't been cleaned up yet
None
}
} else {
warn!("Attempted to register publisher with different topic type than existing publisher: existing_type={}, new_type={}", value.topic_type(), topic_type);
// TODO MAJOR: this is a terrible error type to return...
Some(Err(NodeError::IoError(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
Expand All @@ -636,30 +671,48 @@ impl Node {
}
})
};
// If we found an existing publication return the handle to it
if let Some(handle) = existing_entry {
Ok(handle?)
} else {
// Otherwise create a new Publication
let channel = Publication::new(
&self.node_name,
latching,
&topic,
self.host_addr,
queue_size,
&msg_definition,
&md5sum,
topic_type,
)
.await
.map_err(|err| {
log::error!("Failed to create publishing channel: {err:?}");
err
})?;
let handle = channel.get_sender();
self.publishers.insert(topic.clone(), channel);
let _current_subscribers = self.client.register_publisher(&topic, topic_type).await?;
Ok(handle)
return Ok(handle?);
}

// Otherwise create a new Publication
let (channel, sender) = Publication::new(
&self.node_name,
latching,
&topic,
self.host_addr,
queue_size,
&msg_definition,
&md5sum,
topic_type,
self.node_handle.clone(),
)
.await
.map_err(|err| {
log::error!("Failed to create publishing channel: {err:?}");
err
})?;
self.publishers.insert(topic.clone(), channel);
Ok(sender)
}

async fn unregister_publisher(&mut self, topic: &str) -> Result<(), NodeError> {
// Tell ros master we are no longer publishing this topic
let err1 = self.client.unregister_publisher(topic).await;
// Remove the publication from our internal state
let err2 = self.publishers.remove(topic);
if err1.is_err() || err2.is_none() {
error!(
"Failure unregistering publisher: {err1:?}, {}",
err2.is_none()
);
// MAJOR TODO: this is a terrible error type to return...
return Err(NodeError::IoError(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
)));
}
Ok(())
}

/// Checks the internal state of the NodeServer to see if it has a service client registered for this service already
Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl NodeHandle {
Ok(ServiceServer::new(service_name, self.clone()))
}

// TODO Major: This should probably be moved to NodeServerHandle?
/// Not intended to be called manually
/// Stops hosting the specified server.
/// This is automatically called when dropping the ServiceServer returned by [advertise_service]
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
net::{IpAddr, Ipv4Addr},
};

mod actor;
pub(crate) mod actor;
mod handle;
mod xmlrpc;
use actor::*;
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/node/xmlrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl XmlRpcServer {
let protocols = protocols.iter().flatten().cloned().collect::<Vec<_>>();
debug!("Request for topic {topic} from {caller_id} via protocols {protocols:?}");
let params = node_server
.request_topic(&caller_id, &topic, &protocols)
.request_topic(&topic, &protocols)
.await
.map_err(|e| {
Self::make_error_response(
Expand Down
Loading

0 comments on commit ed84f6b

Please sign in to comment.