From dbc153516bfc3798e0f41633b0794ee3c9bb0d97 Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Sun, 11 Aug 2024 07:52:17 -0700 Subject: [PATCH] publish a vector of bytes given a topic type and message definition --- CHANGELOG.md | 1 + roslibrust/src/ros1/node/actor.rs | 26 ++++++++++++++ roslibrust/src/ros1/node/handle.rs | 19 ++++++++-- roslibrust/src/ros1/publisher.rs | 35 ++++++++++++++++++- roslibrust/src/ros1/subscriber.rs | 1 + .../tests/ros1_native_integration_tests.rs | 32 +++++++++++++++++ 6 files changed, 111 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9215f28..fbddd6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API - Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1 - Can subscribe to any topic and get raw bytes instead of a deserialized message of known type +- Can publish to any topic and send raw bytes instead of a deserialized message ### Fixed diff --git a/roslibrust/src/ros1/node/actor.rs b/roslibrust/src/ros1/node/actor.rs index 5d83fc9..f2d7794 100644 --- a/roslibrust/src/ros1/node/actor.rs +++ b/roslibrust/src/ros1/node/actor.rs @@ -176,6 +176,32 @@ impl NodeServerHandle { })?) } + /// Registers a publisher with the underlying node server + /// Returns a channel that the raw bytes of a publish can be shoved into to queue the publish + pub(crate) async fn register_publisher_any( + &self, + topic: &str, + topic_type: &str, + msg_definition: &str, + queue_size: usize, + latching: bool, + ) -> Result>, NodeError> { + let (sender, receiver) = oneshot::channel(); + self.node_server_sender.send(NodeMsg::RegisterPublisher { + reply: sender, + topic: topic.to_owned(), + topic_type: topic_type.to_owned(), + queue_size, + msg_definition: msg_definition.to_owned(), + md5sum: "*".to_string(), + latching, + })?; + let received = receiver.await?; + Ok(received.map_err(|_err| { + NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) + })?) + } + pub(crate) async fn unregister_publisher(&self, topic: &str) -> Result<(), NodeError> { let (sender, receiver) = oneshot::channel(); self.node_server_sender.send(NodeMsg::UnregisterPublisher { diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index 3ef3e6e..32e80c4 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -1,8 +1,8 @@ use super::actor::{Node, NodeServerHandle}; use crate::{ ros1::{ - names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, - subscriber::SubscriberAny, NodeError, ServiceServer, + names::Name, publisher::Publisher, publisher::PublisherAny, service_client::ServiceClient, + subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer, }, ServiceFn, }; @@ -62,6 +62,21 @@ impl NodeHandle { /// Subsequent calls will simply be given additional handles to the underlying publication. /// This behavior was chosen to mirror ROS1's API, however it is reccomended to .clone() the returend publisher /// instead of calling this function multiple times. + pub async fn advertise_any( + &self, + topic_name: &str, + topic_type: &str, + msg_definition: &str, + queue_size: usize, + latching: bool, + ) -> Result { + let sender = self + .inner + .register_publisher_any(topic_name, topic_type, msg_definition, queue_size, latching) + .await?; + Ok(PublisherAny::new(topic_name, sender)) + } + pub async fn advertise( &self, topic_name: &str, diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index aef331d..50f94f5 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -49,6 +49,38 @@ impl Publisher { } } +pub struct PublisherAny { + topic_name: String, + sender: mpsc::Sender>, + phantom: PhantomData>, +} + +impl PublisherAny { + pub(crate) fn new(topic_name: &str, sender: mpsc::Sender>) -> Self { + Self { + topic_name: topic_name.to_owned(), + sender, + phantom: PhantomData, + } + } + + /// Queues a message to be send on the related topic. + /// Returns when the data has been queued not when data is actually sent. + pub async fn publish(&self, data: &Vec) -> Result<(), PublisherError> { + // TODO this is a pretty dumb... + // because of the internal channel used for re-direction this future doesn't + // actually complete when the data is sent, but merely when it is queued to be sent + // This function could probably be non-async + // Or we should do some significant re-work to have it only yield when the data is sent. + self.sender + .send(data.to_vec()) + .await + .map_err(|_| PublisherError::StreamClosed)?; + debug!("Publishing data on topic {}", self.topic_name); + Ok(()) + } +} + pub(crate) struct Publication { topic_type: String, listener_port: u16, @@ -250,7 +282,8 @@ impl Publication { if let Some(connection_md5sum) = connection_header.md5sum { if connection_md5sum != "*" { if let Some(local_md5sum) = &responding_conn_header.md5sum { - if connection_md5sum != *local_md5sum { + // TODO(lucasw) is it ok to match any with "*"? + if local_md5sum != "*" && connection_md5sum != *local_md5sum { warn!( "Got subscribe request for {}, but md5sums do not match. Expected {:?}, received {:?}", topic_name, diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 900453b..1200dd9 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -179,6 +179,7 @@ async fn establish_publisher_connection( if let Ok(responded_header) = tcpros::receive_header(&mut stream).await { if conn_header.md5sum == Some("*".to_string()) + || responded_header.md5sum == Some("*".to_string()) || conn_header.md5sum == responded_header.md5sum { log::debug!( diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs index 710be2f..7ba0df2 100644 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ b/roslibrust/tests/ros1_native_integration_tests.rs @@ -12,6 +12,38 @@ mod tests { "assets/ros1_common_interfaces" ); + #[test_log::test(tokio::test)] + async fn test_publish_any() { + // publish a single message in raw bytes and test the received message is as expected + let nh = NodeHandle::new("http://localhost:11311", "test_publish_any") + .await + .unwrap(); + + let publisher = nh + .advertise_any( + "/test_publish_any", + "std_msgs/String", + "string data\n", + 1, + true, + ) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe::("/test_publish_any", 1) + .await + .unwrap(); + + let msg_raw: Vec = vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116].to_vec(); + publisher.publish(&msg_raw).await.unwrap(); + + let res = + tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + let msg = res.unwrap().unwrap().unwrap(); + assert_eq!(msg.data, "test"); + } + #[test_log::test(tokio::test)] async fn test_subscribe_any() { // get a single message in raw bytes and test the bytes are as expected