Skip to content

Commit

Permalink
ROS1 Nodes cleanup their subscriptions, advertises, and services on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
Carter committed Oct 21, 2024
1 parent 01693e7 commit 41c4b77
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 60 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Keeping a ros1::ServiceServer alive no longer keeps the underlying node alive past the last ros1::NodeHandle being dropped.
- Dropping the last ros1::NodeHandle results in the node cleaning up any advertises, subscriptions, and services with the ROS master.

### Changed

## 0.11.1
Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/master_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum RosMasterError {

/// A client that exposes the API hosted by the [rosmaster](http://wiki.ros.org/ROS/Master_API)
// TODO consider exposing this type publicly
#[derive(Clone)] // Note is clone to support an odd case in Node::drop
pub struct MasterClient {
client: reqwest::Client,
// Address at which the rosmaster should be found
Expand Down
34 changes: 21 additions & 13 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub(crate) struct NodeServerHandle {
// If this handle should keep the underlying node task alive it will hold an
// Arc to the underlying node task. This is an option because internal handles
// within the node shouldn't keep it alive (e.g. what we hand to xml server)
_node_task: Option<Arc<ChildTask<()>>>,
pub(crate) _node_task: Option<Arc<ChildTask<()>>>,
}

impl NodeServerHandle {
Expand Down Expand Up @@ -857,39 +857,47 @@ impl Node {
// Clears any extant node connections with the ros master
// This is not expected to be called anywhere other than the drop impl
fn shutdown(&mut self) {
let future = async {
// Based on this answer: 3b https://stackoverflow.com/questions/71541765/rust-async-drop
// Make copies of what we need to shut down
let client = self.client.clone();
let subscriptions = std::mem::take(&mut self.subscriptions);
let publishers = std::mem::take(&mut self.publishers);
let service_servers = std::mem::take(&mut self.service_servers);
let host_addr = self.host_addr;

// Move copies into a future that will do the clean-ups
let future = async move {
debug!("Start shutdown node");
// Note: we're ignoring all failures here and doing best effort cleanup
// Many of these log messages will be incorrect until we get our cleanup logic dialed in.
for (topic, _subscriptions) in &self.subscriptions {
for (topic, _subscriptions) in &subscriptions {
debug!("Node shutdown is cleaning up subscription: {topic}");
let _ = self.client.unregister_subscriber(topic).await.map_err(|e| {
let _ = client.unregister_subscriber(topic).await.map_err(|e| {
error!("Failed to unregister subscriber for topic: {topic} while shutting down node");
e
});
debug!("CHECK");
}

for (topic, _publication) in &self.publishers {
for (topic, _publication) in &publishers {
debug!("Node shutdown is cleaning up publishing: {topic}");
let _ = self.client.unregister_publisher(topic).await.map_err(|e| {
let _ = client.unregister_publisher(topic).await.map_err(|e| {
error!("Failed to unregister publisher for topic: {topic} while shutting down node.");
e
});
}

for (topic, service_link) in &self.service_servers {
for (topic, service_link) in &service_servers {
debug!("Node shutdown is cleaning up service: {topic}");
let uri = format!("rosrpc://{}:{}", self.host_addr, service_link.port());
let _ = self.client.unregister_service(topic, uri).await.map_err(|e| {
let uri = format!("rosrpc://{}:{}", host_addr, service_link.port());
let _ = client.unregister_service(topic, uri).await.map_err(|e| {
error!("Failed to unregister server server for topic: {topic} while shutting down node.");
e
});
}
};

let runtime = tokio::runtime::Handle::try_current().expect("Roslibrust should always be run inside tokio runtime");
// Run the shutdown tasks to completion
runtime.block_on(future);
// Spawn shutdown operation in a separate task
tokio::spawn(future);
}
}

Expand Down
15 changes: 14 additions & 1 deletion roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ impl NodeHandle {
Ok(nh)
}

/// This creates a clone() of NodeHandle that doesn't keep the underlying node alive
/// This should be used for things like ServiceServer which wants to be able to talk to the node
/// but doesn't need to keep the node alive.
pub(crate) fn weak_clone(&self) -> NodeHandle {
NodeHandle {
inner: NodeServerHandle {
node_server_sender: self.inner.node_server_sender.clone(),
_node_task: None,
},
}
}

/// This function may be removed...
/// All node handles connect to a backend node server that actually handles the communication with ROS
/// If this function returns false, the backend node server has shut down and this handle is invalid.
Expand Down Expand Up @@ -148,7 +160,8 @@ impl NodeHandle {
.inner
.register_service_server::<T, F>(&service_name, server)
.await?;
Ok(ServiceServer::new(service_name, self.clone()))
// Super important. Don't clone self or we create a STRONG NodeHandle that keeps the node alive
Ok(ServiceServer::new(service_name, self.weak_clone()))
}

// TODO Major: This should probably be moved to NodeServerHandle?
Expand Down
92 changes: 46 additions & 46 deletions roslibrust/tests/ros1_native_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,61 +469,61 @@ mod tests {
/// Test that we correctly purge references to publishers, subscribers and services servers when a node shuts down
#[test_log::test(tokio::test)]
async fn node_cleanup() {
let future = async {
// Create our node
// this nh controls the lifetimes
let nh = NodeHandle::new("http://localhost:11311", "/test_node_cleanup")
.await
.unwrap();

// Create a publisher so a topic will connected
let _publisher = nh
.advertise::<std_msgs::Header>("/test_cleanup_pub", 1, false)
.await
.unwrap();
// Create our node
// this nh controls the lifetimes
let nh = NodeHandle::new("http://localhost:11311", "/test_node_cleanup")
.await
.unwrap();

let _subscriber = nh
.subscribe::<std_msgs::Header>("/test_cleanup_sub", 1)
.await
.unwrap();
// Create pub, sub, and service server to prove all get cleaned up
let _publisher = nh
.advertise::<std_msgs::Header>("/test_cleanup_pub", 1, false)
.await
.unwrap();

let _service_server = nh
.advertise_service::<std_srvs::Trigger, _>("/test_cleanup_srv", |_req| {
Ok(Default::default())
})
.await
.unwrap();
let _subscriber = nh
.subscribe::<std_msgs::Header>("/test_cleanup_sub", 1)
.await
.unwrap();

let master_client = roslibrust::ros1::MasterClient::new(
"http://localhost:11311",
"NAN",
"/test_node_cleanup_checker",
)
let _service_server = nh
.advertise_service::<std_srvs::Trigger, _>("/test_cleanup_srv", |_req| {
Ok(Default::default())
})
.await
.unwrap();

let data = master_client.get_system_state().await.unwrap();
info!("Got data before drop: {data:?}");
let master_client = roslibrust::ros1::MasterClient::new(
"http://localhost:11311",
"NAN",
"/test_node_cleanup_checker",
)
.await
.unwrap();

// Check that our three connections are reported by the ros master before starting
assert!(data.is_publishing("/test_cleanup_pub", "/test_node_cleanup"));
assert!(data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup"));
assert!(data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup"));
let data = master_client.get_system_state().await.unwrap();
info!("Got data before drop: {data:?}");

// Drop our node handle
std::mem::drop(nh);
// Check that our three connections are reported by the ros master before starting
assert!(data.is_publishing("/test_cleanup_pub", "/test_node_cleanup"));
assert!(data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup"));
assert!(data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup"));

// Delay to allow destructor to complete
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let data = master_client.get_system_state().await.unwrap();
info!("Got data after drop: {data:?}");
// Drop our node handle
std::mem::drop(nh);

// Check that our three connections are no longer reported by the ros master after dropping
assert!(!data.is_publishing("/test_cleanup_pub", "/test_node_cleanup"));
assert!(!data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup"));
assert!(!data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup"));
};
// 1 second to complete the whole test.
tokio::time::timeout(tokio::time::Duration::from_secs(1), future).await.unwrap();
// Confirm here that Node actually got shut down
debug!("Drop has happened");
// Delay to allow destructor to complete
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
debug!("sleep is over");
let data = master_client.get_system_state().await.unwrap();
info!("Got data after drop: {data:?}");

// Check that our three connections are no longer reported by the ros master after dropping
assert!(!data.is_publishing("/test_cleanup_pub", "/test_node_cleanup"));
assert!(!data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup"));
assert!(!data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup"));
}

}

0 comments on commit 41c4b77

Please sign in to comment.