Skip to content

Commit

Permalink
Working client service call example
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed May 21, 2024
1 parent a960266 commit 38c2021
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 33 deletions.
31 changes: 31 additions & 0 deletions roslibrust/examples/ros1_service_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces");

#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
use roslibrust::ros1::NodeHandle;

simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.without_timestamps()
.init()
.unwrap();

let nh = NodeHandle::new("http://localhost:11311", "/service_client_rs").await?;
log::info!("Connected!");

let response: rosapi::GetTimeResponse = nh
.service_client::<rosapi::GetTime>("rosapi/get_time")
.await?
.call(&rosapi::GetTimeRequest {})
.await?;

log::info!("Got time: {:?}", response);

Ok(())
}

#[cfg(not(feature = "ros1"))]
fn main() {
eprintln!("This example does nothing without compiling with the feature 'ros1'");
}
30 changes: 17 additions & 13 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ impl NodeServerHandle {
pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> RosLibRustResult<mpsc::UnboundedSender<CallServiceRequest>> {
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, NodeError> {
// Create a channel for hooking into the node server
let (sender, receiver) = oneshot::channel();
match self
.node_server_sender

// Send the request to the node server and see if it accepts it
self.node_server_sender
.send(NodeMsg::RegisterServiceClient {
reply: sender,
service: service_name.to_owned(),
Expand All @@ -160,16 +162,12 @@ impl NodeServerHandle {
[T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(),
),
md5sum: T::MD5SUM.to_owned(),
}) {
Ok(()) => Ok(receiver
.await
.map_err(|_err| RosLibRustError::Disconnected)?
.map_err(|err| {
log::error!("Unable to register service client: {err}");
RosLibRustError::Disconnected
})?),
Err(_err) => Err(RosLibRustError::Disconnected),
}
})?;
let received = receiver.await?;
Ok(received.map_err(|err| {
log::error!("Failed to register service client: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub async fn register_subscriber<T: RosMessageType>(
Expand Down Expand Up @@ -512,7 +510,9 @@ impl Node {
srv_definition: &str,
md5sum: &str,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, Box<dyn std::error::Error>> {
log::debug!("Registering service client for {service}");
let service_name = service.resolve_to_global(&self.node_name).to_string();

let existing_entry = {
self.service_clients.iter().find_map(|(key, value)| {
if key.as_str() == &service_name {
Expand All @@ -530,9 +530,12 @@ impl Node {
};

if let Some(handle) = existing_entry {
log::debug!("Found existing service client for {service}, returning existing handle");
Ok(handle?)
} else {
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;
log::debug!("Found service at {service_uri}");
let server_link = ServiceServerLink::new(
&self.node_name,
&service_name,
Expand All @@ -542,6 +545,7 @@ impl Node {
md5sum,
)
.await?;

let handle = server_link.get_sender();
self.service_clients.insert(service_name, server_link);
Ok(handle)
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl NodeHandle {
pub async fn service_client<T: roslibrust_codegen::RosServiceType>(
&self,
service_name: &str,
) -> RosLibRustResult<ServiceClient<T>> {
) -> Result<ServiceClient<T>, NodeError> {
let service_name = Name::new(service_name)?;
let sender = self
.inner
Expand Down
9 changes: 5 additions & 4 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,20 @@ impl Publication {
latching,
msg_definition: msg_definition.to_owned(),
md5sum: md5sum.to_owned(),
topic: topic_name.to_owned(),
topic: Some(topic_name.to_owned()),
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
};

let subscriber_streams = Arc::new(RwLock::new(Vec::new()));

let subscriber_streams_copy = subscriber_streams.clone();
let topic_name = topic_name.to_owned();
let listener_handle = tokio::spawn(async move {
let subscriber_streams = subscriber_streams_copy;
loop {
if let Ok((mut stream, peer_addr)) = tcp_listener.accept().await {
let topic_name = responding_conn_header.topic.as_str();
log::info!(
"Received connection from subscriber at {peer_addr} for topic {topic_name}"
);
Expand All @@ -93,7 +94,7 @@ impl Publication {
{
if connection_header.md5sum == responding_conn_header.md5sum {
log::debug!(
"Received subscribe request for {}",
"Received subscribe request for {:?}",
connection_header.topic
);
// Write our own connection header in response
Expand All @@ -107,7 +108,7 @@ impl Publication {
let mut wlock = subscriber_streams.write().await;
wlock.push(stream);
log::debug!(
"Added stream for topic {} to subscriber {}",
"Added stream for topic {:?} to subscriber {:?}",
connection_header.topic,
peer_addr
);
Expand Down
18 changes: 16 additions & 2 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@ impl<T: RosServiceType> ServiceClient<T> {

match response_rx.await {
Ok(Ok(result_payload)) => {
let response: T::Response = serde_rosmsg::from_slice(&result_payload[..])
log::debug!(
"Service client for {} got response: {:?}",
self.service_name,
result_payload
);

// Okay the 1.. is funky and needs to be addressed
// This is a little buried in the ROS documentation by the first byte is the "success" byte
// if it is 1 then the rest of the payload is the response
// Otherwise ros silently swaps the payload out for an error message
// We need to parse that error message and display somewhere
let response: T::Response = serde_rosmsg::from_slice(&result_payload[1..])
.map_err(|err| RosLibRustError::SerializationError(err.to_string()))?;
return Ok(response);
}
Expand Down Expand Up @@ -87,7 +98,10 @@ impl ServiceServerLink {
latching: false,
msg_definition: srv_definition.to_owned(),
md5sum: md5sum.to_owned(),
topic: service_name.to_owned(),
// Note: using "topic" indicates a subscription
// using "service" indicates a service client
topic: None,
service: Some(service_name.to_owned()),
topic_type: service_type.to_owned(),
tcp_nodelay: false,
};
Expand Down
9 changes: 5 additions & 4 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ impl Subscription {
latching: false,
msg_definition,
md5sum,
topic: topic_name.to_owned(),
topic: Some(topic_name.to_owned()),
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
};

Self {
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Subscription {

if is_new_connection {
let node_name = self.connection_header.caller_id.clone();
let topic_name = self.connection_header.topic.clone();
let topic_name = self.connection_header.topic.as_ref().unwrap().clone();
let connection_header = self.connection_header.clone();
let sender = self.msg_sender.clone();
let publisher_list = self.known_publishers.clone();
Expand Down Expand Up @@ -128,7 +129,7 @@ impl Subscription {
}
read_buffer.clear();
} else {
log::warn!("Got an error reading from the publisher connection on topic {topic_name}, closing");
log::warn!("Got an error reading from the publisher connection on topic {topic_name:?}, closing");
}
}
}
Expand Down Expand Up @@ -161,7 +162,7 @@ async fn establish_publisher_connection(
if let Ok(responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) {
if conn_header.md5sum == responded_header.md5sum {
log::debug!(
"Established connection with publisher for {}",
"Established connection with publisher for {:?}",
conn_header.topic
);
Ok(stream)
Expand Down
57 changes: 48 additions & 9 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ use tokio::net::TcpStream;
use super::names::Name;

// Implementation of ConnectionHeader is based off of ROS documentation here:
// wiki.ros.org/ROS/Connection%20Header
// https://wiki.ros.org/ROS/Connection%20Header
// and here:
// https://wiki.ros.org/ROS/TCPROS
#[derive(Clone, Debug)]

pub struct ConnectionHeader {
pub caller_id: String,
pub latching: bool,
pub msg_definition: String,
pub md5sum: String,
pub topic: String,
// TODO we may want to distinguish between service and topic headers with different types?
pub service: Option<String>,
pub topic: Option<String>,
pub topic_type: String,
pub tcp_nodelay: bool,
}
Expand All @@ -25,7 +30,8 @@ impl ConnectionHeader {
let mut caller_id = String::new();
let mut latching = false;
let mut md5sum = String::new();
let mut topic = String::new();
let mut topic = None;
let mut service = None;
let mut topic_type = String::new();
let mut tcp_nodelay = false;

Expand All @@ -51,13 +57,21 @@ impl ConnectionHeader {
} else if field.starts_with("md5sum=") {
field[equals_pos + 1..].clone_into(&mut md5sum);
} else if field.starts_with("topic=") {
field[equals_pos + 1..].clone_into(&mut topic);
let mut topic_str = String::new();
field[equals_pos + 1..].clone_into(&mut topic_str);
topic = Some(topic_str);
} else if field.starts_with("service=") {
let mut service_str = String::new();
field[equals_pos + 1..].clone_into(&mut service_str);
service = Some(service_str);
} else if field.starts_with("type=") {
field[equals_pos + 1..].clone_into(&mut topic_type);
} else if field.starts_with("tcp_nodelay=") {
let mut tcp_nodelay_str = String::new();
field[equals_pos + 1..].clone_into(&mut tcp_nodelay_str);
tcp_nodelay = &tcp_nodelay_str != "0";
} else if field.starts_with("error=") {
log::error!("Error reported in TCPROS connection header: {field}");
} else {
log::warn!("Encountered unhandled field in connection header: {field}");
}
Expand All @@ -69,6 +83,7 @@ impl ConnectionHeader {
msg_definition,
md5sum,
topic,
service,
topic_type,
tcp_nodelay,
})
Expand Down Expand Up @@ -101,9 +116,17 @@ impl ConnectionHeader {
header_data.write(tcp_nodelay.as_bytes())?;
}

let topic = format!("topic={}", self.topic);
header_data.write_u32::<LittleEndian>(topic.len() as u32)?;
header_data.write(topic.as_bytes())?;
if let Some(topic) = self.topic.as_ref() {
let topic = format!("topic={}", topic);
header_data.write_u32::<LittleEndian>(topic.len() as u32)?;
header_data.write(topic.as_bytes())?;
}

if let Some(service) = self.service.as_ref() {
let service = format!("service={}", service);
header_data.write_u32::<LittleEndian>(service.len() as u32)?;
header_data.write(service.as_bytes())?;
}

let topic_type = format!("type={}", self.topic_type);
header_data.write_u32::<LittleEndian>(topic_type.len() as u32)?;
Expand All @@ -118,6 +141,8 @@ impl ConnectionHeader {
}
}

/// Creates a new TCP connection to the given server URI and sends the connection header.
/// The only current user of this is service clients.
pub async fn establish_connection(
node_name: &Name,
topic_name: &str,
Expand All @@ -126,7 +151,21 @@ pub async fn establish_connection(
) -> Result<TcpStream, std::io::Error> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};

let mut stream = TcpStream::connect(server_uri).await?;
// Okay in Shane's version of this the server_uri is coming in as "rosrpc://localhost:41105"
// which is causing this to break...
// Not sure what that is the fault of, but I'm going to try to clean up the address here
// I think the correct way of doing this would be with some king of URI parsing library, for now being very lazy
// TODO confirm if all libs respond with rosrpc:// or not...
let server_uri = &server_uri.replace("rosrpc://", "");

let mut stream = TcpStream::connect(server_uri).await.map_err(
|err| {
log::error!(
"Failed to establish TCP connection to server {server_uri} for topic {topic_name}: {err}"
);
err
},
)?;

let conn_header_bytes = conn_header.to_bytes(true)?;
stream.write_all(&conn_header_bytes[..]).await?;
Expand All @@ -140,7 +179,7 @@ pub async fn establish_connection(
if let Ok(responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) {
if conn_header.md5sum == responded_header.md5sum {
log::debug!(
"Established connection with node {node_name} for {}",
"Established connection with node {node_name} for topic {:?}",
conn_header.topic
);
Ok(stream)
Expand Down

0 comments on commit 38c2021

Please sign in to comment.