diff --git a/Cargo.toml b/Cargo.toml index e91f1802d..24e452c42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ members = [ "r2r_msg_gen", "r2r_macros", "r2r_rcl", + "r2r_tracing", ] diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index d7fd214a2..d6f8f84ea 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -23,6 +23,7 @@ r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" } r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" } r2r_actions = { path = "../r2r_actions", version = "0.9.5" } r2r_macros = { path = "../r2r_macros", version = "0.9.5" } +r2r_tracing = { path = "../r2r_tracing", version = "0.9.5" } uuid = { version = "1.2.2", features = ["serde", "v4"] } futures = "0.3.25" log = "0.4.18" @@ -50,6 +51,7 @@ prettyplease = "0.2.6" [features] save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen", "r2r_actions/save-bindgen"] doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", "r2r_actions/doc-only"] +tracing = ["r2r_tracing/tracing"] [package.metadata.docs.rs] features = ["doc-only"] diff --git a/r2r/examples/callback_tracing.rs b/r2r/examples/callback_tracing.rs new file mode 100644 index 000000000..51433b279 --- /dev/null +++ b/r2r/examples/callback_tracing.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use futures::{executor::LocalPool, task::LocalSpawnExt}; + +use r2r::{std_msgs::msg, std_srvs::srv, QosProfile}; + +/// This example demonstrates creation of a service, +/// subscriber and timers with their callback execution traced. +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + // The traced callback is supplied directly to `subscribe_traced` + // and `create_service_traced`functions. + let subscriber_future = node + .subscribe("/print", QosProfile::default())? + .traced_callback(|msg: msg::String| { + println!("Received message: '{}'", msg.data); + }); + spawner.spawn_local(subscriber_future)?; + + let mut value = false; + let service_future = node + .create_service::("/set_value", QosProfile::default())? + .traced_callback(move |req| { + if value == req.message.data { + req.respond(srv::SetBool::Response { + success: false, + message: format!("Value is already {value}."), + }) + .expect("could not send service response"); + } else { + value = req.message.data; + req.respond(srv::SetBool::Response { + success: true, + message: format!("Value set to {value}."), + }) + .expect("could not send service response"); + } + }); + spawner.spawn_local(service_future)?; + + let mut counter = 0; + let wall_timer_future = + node.create_wall_timer(Duration::from_millis(500))? + .on_tick(move |_| { + counter += 1; + println!("Wall timer tick: {counter}"); + }); + spawner.spawn_local(wall_timer_future)?; + + let ros_timer_future = node.create_timer(Duration::from_secs(1))?.on_tick(|diff| { + println!("ROS timer tick. Time elapsed since last tick: {diff:?}"); + }); + spawner.spawn_local(ros_timer_future)?; + + loop { + node.spin_once(std::time::Duration::from_millis(5)); + pool.run_until_stalled(); + } +} diff --git a/r2r/src/clients.rs b/r2r/src/clients.rs index 67d2d68ed..2b5267483 100644 --- a/r2r/src/clients.rs +++ b/r2r/src/clients.rs @@ -318,11 +318,10 @@ impl Client_ for UntypedClient_ { } } -pub fn create_client_helper( - node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut client_handle = unsafe { rcl_get_zero_initialized_client() }; +pub unsafe fn create_client_helper( + client_handle: &mut rcl_client_t, node: *mut rcl_node_t, service_name: &str, + service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let service_name_c_string = CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -330,15 +329,16 @@ pub fn create_client_helper( let mut client_options = rcl_client_get_default_options(); client_options.qos = qos_profile.into(); rcl_client_init( - &mut client_handle, + client_handle, node, service_ts, service_name_c_string.as_ptr(), &client_options, ) }; + if result == RCL_RET_OK as i32 { - Ok(client_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } diff --git a/r2r/src/lib.rs b/r2r/src/lib.rs index 904485a7b..4b52aee1c 100644 --- a/r2r/src/lib.rs +++ b/r2r/src/lib.rs @@ -124,6 +124,7 @@ pub use clocks::{Clock, ClockType}; mod nodes; pub use nodes::{Node, Timer}; +pub use r2r_tracing::StreamWithTracingData; pub mod qos; diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 435746a17..a9c4f5487 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -4,6 +4,7 @@ use futures::{ stream::{Stream, StreamExt}, }; use indexmap::IndexMap; +use r2r_tracing::{StreamWithTracingData, StreamWithTracingDataBuilder, TracingId}; use std::{ collections::HashMap, ffi::{CStr, CString}, @@ -290,151 +291,143 @@ impl Node { let node_name = self.name()?; // rcl_interfaces/srv/SetParameters - let set_params_request_stream = self + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let set_params_future = self .create_service::( &format!("{}/set_parameters", node_name), QosProfile::default(), - )?; - - let params = self.params.clone(); - let params_struct_clone = params_struct.clone(); - let set_params_future = set_params_request_stream.for_each( - move |req: ServiceRequest| { - let mut result = rcl_interfaces::srv::SetParameters::Response::default(); - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - let changed = params - .lock() - .unwrap() - .get(&p.name) - .map(|v| v.value != val) - .unwrap_or(true); // changed=true if new - let r = if let Some(ps) = ¶ms_struct_clone { - // Update parameter structure - let result = ps.lock().unwrap().set_parameter(&p.name, &val); - if result.is_ok() { - // Also update Node::params + )? + .traced_callback( + move |req: ServiceRequest| { + let mut result = rcl_interfaces::srv::SetParameters::Response::default(); + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params params .lock() .unwrap() .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()); - } - rcl_interfaces::msg::SetParametersResult { - successful: result.is_ok(), - reason: result.err().map_or("".into(), |e| e.to_string()), - } - } else { - // No parameter structure - update only Node::params - params - .lock() - .unwrap() - .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()) - .or_insert(Parameter::new(val.clone())); - rcl_interfaces::msg::SetParametersResult { - successful: true, - reason: "".into(), - } - }; - // if the value changed, send out new value on parameter event stream - if changed && r.successful { - if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { - log::debug!("Warning: could not send parameter event ({}).", e); + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { + log::debug!("Warning: could not send parameter event ({}).", e); + } } + result.results.push(r); } - result.results.push(r); - } - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); handlers.push(Box::pin(set_params_future)); // rcl_interfaces/srv/GetParameters - let get_params_request_stream = self + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let get_params_future = self .create_service::( &format!("{}/get_parameters", node_name), QosProfile::default(), - )?; - - let params = self.params.clone(); - let params_struct_clone = params_struct.clone(); - let get_params_future = get_params_request_stream.for_each( - move |req: ServiceRequest| { - let params = params.lock().unwrap(); - let values = req - .message - .names - .iter() - .map(|n| { - // First try to get the parameter from the param structure - if let Some(ps) = ¶ms_struct_clone { - if let Ok(value) = ps.lock().unwrap().get_parameter(n) { - return value; + )? + .traced_callback( + move |req: ServiceRequest| { + let params = params.lock().unwrap(); + let values = req + .message + .names + .iter() + .map(|n| { + // First try to get the parameter from the param structure + if let Some(ps) = ¶ms_struct_clone { + if let Ok(value) = ps.lock().unwrap().get_parameter(n) { + return value; + } } - } - // Otherwise get it from node HashMap - match params.get(n) { - Some(v) => v.value.clone(), - None => ParameterValue::NotSet, - } - }) - .map(|v| v.into_parameter_value_msg()) - .collect::>(); + // Otherwise get it from node HashMap + match params.get(n) { + Some(v) => v.value.clone(), + None => ParameterValue::NotSet, + } + }) + .map(|v| v.into_parameter_value_msg()) + .collect::>(); - let result = rcl_interfaces::srv::GetParameters::Response { values }; - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + let result = rcl_interfaces::srv::GetParameters::Response { values }; + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); handlers.push(Box::pin(get_params_future)); + let params = self.params.clone(); + // rcl_interfaces/srv/ListParameters use rcl_interfaces::srv::ListParameters; - let list_params_request_stream = self.create_service::( - &format!("{}/list_parameters", node_name), - QosProfile::default(), - )?; - - let params = self.params.clone(); - let list_params_future = list_params_request_stream.for_each( - move |req: ServiceRequest| { + let list_params_future = self + .create_service::( + &format!("{}/list_parameters", node_name), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { Self::handle_list_parameters(req, ¶ms) - }, - ); + }); handlers.push(Box::pin(list_params_future)); // rcl_interfaces/srv/DescribeParameters use rcl_interfaces::srv::DescribeParameters; - let desc_params_request_stream = self.create_service::( - &format!("{node_name}/describe_parameters"), - QosProfile::default(), - )?; - let params = self.params.clone(); - let desc_params_future = desc_params_request_stream.for_each( - move |req: ServiceRequest| { + let desc_params_future = self + .create_service::( + &format!("{node_name}/describe_parameters"), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { Self::handle_desc_parameters(req, ¶ms) - }, - ); + }); handlers.push(Box::pin(desc_params_future)); // rcl_interfaces/srv/GetParameterTypes use rcl_interfaces::srv::GetParameterTypes; - let get_param_types_request_stream = self.create_service::( - &format!("{node_name}/get_parameter_types"), - QosProfile::default(), - )?; - let params = self.params.clone(); - let get_param_types_future = get_param_types_request_stream.for_each( - move |req: ServiceRequest| { + let get_param_types_future = self + .create_service::( + &format!("{node_name}/get_parameter_types"), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { let params = params.lock().unwrap(); let types = req .message @@ -447,86 +440,89 @@ impl Node { .collect(); req.respond(GetParameterTypes::Response { types }) .expect("could not send reply to get parameter types request"); - future::ready(()) - }, - ); + }); handlers.push(Box::pin(get_param_types_future)); // rcl_interfaces/srv/SetParametersAtomically - let set_params_atomically_request_stream = - self.create_service::( - &format!("{}/set_parameters_atomically", node_name), - QosProfile::default(), - )?; - let params = self.params.clone(); let params_struct_clone = params_struct.clone(); - let set_params_atomically_future = set_params_atomically_request_stream.for_each( - move |req: ServiceRequest| { - let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default(); - result.result.successful = true; - if let Some(ps) = ¶ms_struct_clone { - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) { - result.result.successful = false; - result.result.reason = format!("Can't set parameter {}: {}", p.name, e); - break; + let set_params_atomically_future = self + .create_service::( + &format!("{}/set_parameters_atomically", node_name), + QosProfile::default(), + )? + .traced_callback( + move |req: ServiceRequest< + rcl_interfaces::srv::SetParametersAtomically::Service, + >| { + let mut result = + rcl_interfaces::srv::SetParametersAtomically::Response::default(); + result.result.successful = true; + if let Some(ps) = ¶ms_struct_clone { + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) { + result.result.successful = false; + result.result.reason = + format!("Can't set parameter {}: {}", p.name, e); + break; + } } } - } - if result.result.successful { - // Since we checked them above now we assume these will be set ok... - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - let changed = params - .lock() - .unwrap() - .get(&p.name) - .map(|v| v.value != val) - .unwrap_or(true); // changed=true if new - let r = if let Some(ps) = ¶ms_struct_clone { - // Update parameter structure - let result = ps.lock().unwrap().set_parameter(&p.name, &val); - if result.is_ok() { - // Also update Node::params + if result.result.successful { + // Since we checked them above now we assume these will be set ok... + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params params .lock() .unwrap() .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()); - } - rcl_interfaces::msg::SetParametersResult { - successful: result.is_ok(), - reason: result.err().map_or("".into(), |e| e.to_string()), - } - } else { - // No parameter structure - update only Node::params - params - .lock() - .unwrap() - .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()) - .or_insert(Parameter::new(val.clone())); - rcl_interfaces::msg::SetParametersResult { - successful: true, - reason: "".into(), - } - }; - // if the value changed, send out new value on parameter event stream - if changed && r.successful { - if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) { - log::debug!("Warning: could not send parameter event ({}).", e); + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = + set_atomically_event_tx.try_send((p.name.clone(), val)) + { + log::debug!("Warning: could not send parameter event ({}).", e); + } } } } - } - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); + handlers.push(Box::pin(set_params_atomically_future)); #[cfg(r2r__rosgraph_msgs__msg__Clock)] @@ -559,7 +555,7 @@ impl Node { fn handle_list_parameters( req: ServiceRequest, params: &Arc>>, - ) -> future::Ready<()> { + ) { use rcl_interfaces::srv::ListParameters; let depth = req.message.depth; @@ -596,13 +592,12 @@ impl Node { } req.respond(ListParameters::Response { result }) .expect("could not send reply to list parameter request"); - future::ready(()) } fn handle_desc_parameters( req: ServiceRequest, params: &Arc>>, - ) -> future::Ready<()> { + ) { use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters}; let mut descriptors = Vec::::new(); let params = params.lock().unwrap(); @@ -618,7 +613,6 @@ impl Node { } req.respond(DescribeParameters::Response { descriptors }) .expect("could not send reply to describe parameters request"); - future::ready(()) } /// Fetch a single ROS parameter. @@ -649,19 +643,38 @@ impl Node { /// This function returns a `Stream` of ros messages. pub fn subscribe( &mut self, topic: &str, qos_profile: QosProfile, - ) -> Result + Unpin> + ) -> Result> where T: WrappedTypesupport, { - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::(10); - let ws = TypedSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(TypedSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + T::get_ts(), + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&*subscription).forget_type() + }); + + self.subscribers.push(subscription); Ok(receiver) } @@ -670,19 +683,38 @@ impl Node { /// This function returns a `Stream` of ros messages without the rust convenience types. pub fn subscribe_native( &mut self, topic: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> + ) -> Result>> where T: WrappedTypesupport, { - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = NativeSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(NativeSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + T::get_ts(), + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + + self.subscribers.push(subscription); Ok(receiver) } @@ -692,18 +724,37 @@ impl Node { /// Useful when you cannot know the type of the message at compile time. pub fn subscribe_untyped( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> { + ) -> Result>> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = UntypedSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(UntypedSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, topic_type: topic_type.to_string(), sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + msg.ts, + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + + self.subscribers.push(subscription); Ok(receiver) } @@ -713,7 +764,7 @@ impl Node { /// Useful if you just want to pass the data along to another part of the system. pub fn subscribe_raw( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> { + ) -> Result>> { // TODO is it possible to handle the raw message without type support? // // Passing null ts to rcl_subscription_init throws an error .. @@ -739,16 +790,35 @@ impl Node { return Err(Error::from_rcl_error(ret)); } - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = RawSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(RawSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, msg_buf, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + msg.ts, + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + + self.subscribers.push(subscription); Ok(receiver) } @@ -758,24 +828,41 @@ impl Node { /// `respond` on the Service Request to send the reply. pub fn create_service( &mut self, service_name: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> + ) -> Result>> where T: WrappedServiceTypeSupport, { - let service_handle = create_service_helper( - self.node_handle.as_mut(), - service_name, - T::get_ts(), - qos_profile, - )?; let (sender, receiver) = mpsc::channel::>(10); - let ws = TypedService:: { - rcl_handle: service_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut service_arc = Arc::new(Mutex::new(TypedService:: { + rcl_handle: unsafe { rcl_get_zero_initialized_service() }, sender, + })); + let service_ref = Arc::get_mut(&mut service_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The service was zero initialized above. + // Full initialization happens in `create_service_helper``. + unsafe { + create_service_helper( + &mut service_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + T::get_ts(), + qos_profile, + )?; }; - self.services.push(Arc::new(Mutex::new(ws))); + let receiver = StreamWithTracingDataBuilder::build_service(receiver, unsafe { + TracingId::new(&service_ref.rcl_handle) + }); + + // Only push after full initialization. + self.services.push(service_arc); Ok(receiver) } @@ -788,19 +875,30 @@ impl Node { where T: WrappedServiceTypeSupport, { - let client_handle = create_client_helper( - self.node_handle.as_mut(), - service_name, - T::get_ts(), - qos_profile, - )?; - let ws = TypedClient:: { - rcl_handle: client_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut client_arc = Arc::new(Mutex::new(TypedClient:: { + rcl_handle: unsafe { rcl_get_zero_initialized_client() }, response_channels: Vec::new(), poll_available_channels: Vec::new(), + })); + let client_ref = Arc::get_mut(&mut client_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The client was zero initialized above. + // Full initialization happens in `create_client_helper`. + unsafe { + create_client_helper( + &mut client_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + T::get_ts(), + qos_profile, + )?; }; - let client_arc = Arc::new(Mutex::new(ws)); let c = make_client(Arc::downgrade(&client_arc)); self.clients.push(client_arc); Ok(c) @@ -816,20 +914,32 @@ impl Node { &mut self, service_name: &str, service_type: &str, qos_profile: QosProfile, ) -> Result { let service_type = UntypedServiceSupport::new_from(service_type)?; - let client_handle = create_client_helper( - self.node_handle.as_mut(), - service_name, - service_type.ts, - qos_profile, - )?; - let client = UntypedClient_ { + + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut client_arc = Arc::new(Mutex::new(UntypedClient_ { service_type, - rcl_handle: client_handle, + rcl_handle: unsafe { rcl_get_zero_initialized_client() }, response_channels: Vec::new(), poll_available_channels: Vec::new(), + })); + let client_ref = Arc::get_mut(&mut client_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The client was zero initialized above. + // Full initialization happens in `create_client_helper`. + unsafe { + create_client_helper( + &mut client_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + client_ref.service_type.ts, + qos_profile, + )?; }; - let client_arc = Arc::new(Mutex::new(client)); let c = make_untyped_client(Arc::downgrade(&client_arc)); self.clients.push(client_arc); Ok(c) @@ -962,11 +1072,10 @@ impl Node { where T: WrappedTypesupport, { - let publisher_handle = + let publisher_arc = create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let arc = Arc::new(publisher_handle); - let p = make_publisher(Arc::downgrade(&arc)); - self.pubs.push(arc); + let p = make_publisher(Arc::downgrade(&publisher_arc)); + self.pubs.push(publisher_arc); Ok(p) } @@ -977,11 +1086,10 @@ impl Node { &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, ) -> Result { let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?; - let publisher_handle = + let publisher_arc = create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?; - let arc = Arc::new(publisher_handle); - let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned()); - self.pubs.push(arc); + let p = make_publisher_untyped(Arc::downgrade(&publisher_arc), topic_type.to_owned()); + self.pubs.push(publisher_arc); Ok(p) } @@ -1019,6 +1127,8 @@ impl Node { /// `timeout` is a duration specifying how long the spin should /// block for if there are no pending events. pub fn spin_once(&mut self, timeout: Duration) { + r2r_tracing::trace_spin_start(&*self.node_handle, timeout); + // first handle any completed action cancellation responses for a in &mut self.action_servers { a.lock().unwrap().send_completed_cancel_requests(); @@ -1182,9 +1292,12 @@ impl Node { unsafe { rcl_wait_set_fini(&mut ws); } + r2r_tracing::trace_spin_timeout(&*self.node_handle); return; } + r2r_tracing::trace_spin_wake(&*self.node_handle); + let mut subs_to_remove = vec![]; if ws.subscriptions != std::ptr::null_mut() { let ws_subs = @@ -1341,6 +1454,8 @@ impl Node { unsafe { rcl_wait_set_fini(&mut ws); } + + r2r_tracing::trace_spin_end(&*self.node_handle); } /// Returns a map of topic names and type names of the publishers @@ -1444,9 +1559,15 @@ impl Node { _clock: Some(clock), // The timer owns the clock. sender: tx, }; - self.timers.push(timer); - let out_timer = Timer { receiver: rx }; + let out_timer = unsafe { + Timer { + receiver: rx, + timer_handle: TracingId::new(timer.get_handle()), + node_handle: TracingId::new(&*self.node_handle), + } + }; + self.timers.push(timer); Ok(out_timer) } @@ -1467,9 +1588,15 @@ impl Node { _clock: None, // The timer does not own the clock (the node owns it). sender: tx, }; - self.timers.push(timer); - let out_timer = Timer { receiver: rx }; + let out_timer = unsafe { + Timer { + receiver: rx, + timer_handle: TracingId::new(timer.get_handle()), + node_handle: TracingId::new(&*self.node_handle), + } + }; + self.timers.push(timer); Ok(out_timer) } @@ -1601,6 +1728,8 @@ impl Drop for Timer_ { /// A ROS timer. pub struct Timer { receiver: mpsc::Receiver, + timer_handle: TracingId, + node_handle: TracingId, } impl Timer { @@ -1615,6 +1744,23 @@ impl Timer { Err(Error::RCL_RET_TIMER_INVALID) } } + + /// Transforms this timer stream to a [`Future`] calling the given `callback` on each tick. + /// + /// The callback execution is traced by r2r_tracing. + /// + /// This function should be called before dropping the timer's node. + /// Otherwise, the trace data might be inconsistent. + #[must_use = "Futures do nothing unless you `.await` or poll them"] + pub fn on_tick(self, callback: F) -> impl Future + Unpin { + let mut callback = r2r_tracing::Callback::new_timer(self.timer_handle, callback); + r2r_tracing::trace_timer_link_node(self.timer_handle, self.node_handle); + + self.receiver.for_each(move |duration| { + callback.call(duration); + future::ready(()) + }) + } } // Since publishers are temporarily upgraded to owners during the diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index cb3bd3bd7..332d80db3 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -3,7 +3,7 @@ use std::{ ffi::{c_void, CString}, fmt::Debug, marker::PhantomData, - sync::{Mutex, Once, Weak}, + sync::{Arc, Mutex, Once, Weak}, }; use crate::{error::*, msg_types::*, qos::QosProfile}; @@ -137,15 +137,24 @@ pub fn make_publisher_untyped(handle: Weak, type_: String) -> Publis pub fn create_publisher_helper( node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t, qos_profile: QosProfile, -) -> Result { - let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; +) -> Result> { let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + // Allocate the memory now so that the location of the rcl handle + // does not change after call to rcl_publisher_init. + // This is important because tracing in rcl expects the handle to be at a fixed location. + let mut publisher_arc = Arc::new(Publisher_ { + handle: unsafe { rcl_get_zero_initialized_publisher() }, + poll_inter_process_subscriber_channels: Mutex::new(Vec::new()), + }); + let publisher_mut = Arc::get_mut(&mut publisher_arc) + .expect("No other Arc should exist. The Arc was just created."); + let result = unsafe { let mut publisher_options = rcl_publisher_get_default_options(); publisher_options.qos = qos_profile.into(); rcl_publisher_init( - &mut publisher_handle, + &mut publisher_mut.handle, node, typesupport, topic_c_string.as_ptr(), @@ -153,10 +162,7 @@ pub fn create_publisher_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(Publisher_ { - handle: publisher_handle, - poll_inter_process_subscriber_channels: Mutex::new(Vec::new()), - }) + Ok(publisher_arc) } else { Err(Error::from_rcl_error(result)) } @@ -176,6 +182,8 @@ impl PublisherUntyped { let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?; native_msg.from_json(msg)?; + r2r_tracing::trace_publish(native_msg.void_ptr()); + let result = unsafe { rcl_publish( &publisher.handle as *const rcl_publisher_t, @@ -214,6 +222,8 @@ impl PublisherUntyped { allocator: unsafe { rcutils_get_default_allocator() }, }; + r2r_tracing::trace_publish((&msg_buf as *const rcl_serialized_message_t).cast::()); + let result = unsafe { rcl_publish_serialized_message( &publisher.handle, @@ -271,6 +281,9 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + + r2r_tracing::trace_publish(native_msg.void_ptr()); + let result = unsafe { rcl_publish( &publisher.handle as *const rcl_publisher_t, @@ -357,6 +370,8 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + r2r_tracing::trace_publish(msg.void_ptr()); + let result = if msg.is_loaned { unsafe { // signal that we are relinquishing responsibility of the memory diff --git a/r2r/src/services.rs b/r2r/src/services.rs index e4fd8c846..c9c820b3d 100644 --- a/r2r/src/services.rs +++ b/r2r/src/services.rs @@ -111,11 +111,13 @@ where } } -pub fn create_service_helper( - node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut service_handle = unsafe { rcl_get_zero_initialized_service() }; +/// Initializes the service. +/// +/// SAFETY: requires that the service handle is zero initialized by [`rcl_get_zero_initialized_service`]. +pub unsafe fn create_service_helper( + service_handle: &mut rcl_service_t, node: &mut rcl_node_t, service_name: &str, + service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let service_name_c_string = CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -123,7 +125,7 @@ pub fn create_service_helper( let mut service_options = rcl_service_get_default_options(); service_options.qos = qos_profile.into(); rcl_service_init( - &mut service_handle, + service_handle, node, service_ts, service_name_c_string.as_ptr(), @@ -131,7 +133,7 @@ pub fn create_service_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(service_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 5ee510209..10cb133aa 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -55,6 +55,8 @@ where rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { + r2r_tracing::trace_take_ptr(msg.void_ptr()); + let msg = T::from_native(&msg); if let Err(e) = self.sender.try_send(msg) { if e.is_disconnected() { @@ -138,6 +140,9 @@ where new_msg } }; + + r2r_tracing::trace_take_ptr(msg.void_ptr()); + if let Err(e) = self.sender.try_send(msg) { if e.is_disconnected() { // user dropped the handle to the stream, signal removal. @@ -168,6 +173,8 @@ impl Subscriber_ for UntypedSubscriber { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { + r2r_tracing::trace_take_ptr(msg.void_ptr()); + let json = msg.to_json(); if let Err(e) = self.sender.try_send(json) { if e.is_disconnected() { @@ -215,6 +222,8 @@ impl Subscriber_ for RawSubscriber { } }; + r2r_tracing::trace_take(&self.msg_buf); + if let Err(e) = self.sender.try_send(data_bytes) { if e.is_disconnected() { // user dropped the handle to the stream, signal removal. @@ -234,18 +243,17 @@ impl Subscriber_ for RawSubscriber { } } -pub fn create_subscription_helper( - node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; +pub unsafe fn create_subscription_helper( + subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str, + ts: *const rosidl_message_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; let result = unsafe { let mut subscription_options = rcl_subscription_get_default_options(); subscription_options.qos = qos_profile.into(); rcl_subscription_init( - &mut subscription_handle, + subscription_handle, node, ts, topic_c_string.as_ptr(), @@ -253,7 +261,7 @@ pub fn create_subscription_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(subscription_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index 91b1a09c6..4b2623c5a 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -9,14 +9,18 @@ use crate::{ Clock, ClockType, Node, QosProfile, WrappedTypesupport, }; use r2r_rcl::{ - rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t, - rmw_message_info_t, RCL_RET_OK, + rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t, + rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK, +}; +use r2r_tracing::TracingId; +use std::{ + any::type_name, + sync::{Arc, Mutex, Weak}, }; -use std::sync::{Arc, Mutex, Weak}; /// Provides time from `/clock` topic to attached ROS clocks /// -/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled. +/// By default, only clock used by ROS timers is attached and time from `/clock` topic is disabled. /// /// The time from `/clock` topic can be activated by either of these: /// - calling [`TimeSource::enable_sim_time`] @@ -102,7 +106,7 @@ impl TimeSource { match inner.subscriber_state { TimeSourceSubscriberState::None => { let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?; - node.subscribers.push(Box::new(subscriber)); + node.subscribers.push(subscriber); inner.subscriber_state = TimeSourceSubscriberState::Active; } TimeSourceSubscriberState::ToBeDestroyed => { @@ -191,20 +195,39 @@ impl TimeSource_ { } impl TimeSourceSubscriber { - fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result { + fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result> { // The values are set based on default values in rclcpp let qos = QosProfile::default().keep_last(1).best_effort(); - let subscriber = create_subscription_helper( - node_handle, - "/clock", - crate::rosgraph_msgs::msg::Clock::get_ts(), - qos, - )?; - Ok(Self { - subscriber_handle: subscriber, + let mut subscriber = Box::new(Self { + subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() }, time_source, - }) + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscriber.subscriber_handle, + node_handle, + "/clock", + crate::rosgraph_msgs::msg::Clock::get_ts(), + qos, + )? + }; + + // Use TimeSource inner arc address as callback_id. We hope that this id is much larger value than + // any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1. + let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize; + r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber); + r2r_tracing::trace_subscription_callback_added( + unsafe { TracingId::new(&*subscriber).forget_type() }, + callback_id, + ); + r2r_tracing::trace_callback_register(callback_id, type_name::()); + + Ok(subscriber) } } @@ -226,11 +249,16 @@ impl Subscriber_ for TimeSourceSubscriber { ) }; + r2r_tracing::trace_take_ptr(clock_msg.void_ptr()); + let mut inner_time_source = self.time_source.inner.lock().unwrap(); if ret == RCL_RET_OK as i32 { let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg); + let time_source_ptr = Arc::as_ptr(&self.time_source.inner) as usize; + r2r_tracing::trace_callback_start(time_source_ptr, false); inner_time_source.set_clock_time(msg.clock); + r2r_tracing::trace_callback_end(time_source_ptr); } match inner_time_source.subscriber_state { diff --git a/r2r_tracing/Cargo.toml b/r2r_tracing/Cargo.toml new file mode 100644 index 000000000..29d148c22 --- /dev/null +++ b/r2r_tracing/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "r2r_tracing" +description = "Internal dependency containing tracepoint definitions or imports for r2r." +authors = ["Martin Škoudlil "] +version = "0.9.5" +license = "MIT" +edition = "2021" +repository = "https://github.com/sequenceplanner/r2r" +readme = "README.md" + +[dependencies] +futures = "0.3.25" +r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" } + +[build-dependencies] +lttng-ust-generate = { git = "https://github.com/skoudmar/lttng-ust-rs.git", rev = "bd437b5eeb8c4378d78ebb5aa17f0f4a95828b32", optional = true} +r2r_common = { path = "../r2r_common", version = "0.9.5", optional = true} + +[features] +tracing = ["dep:lttng-ust-generate", "dep:r2r_common"] diff --git a/r2r_tracing/README.md b/r2r_tracing/README.md new file mode 100644 index 000000000..347332fe5 --- /dev/null +++ b/r2r_tracing/README.md @@ -0,0 +1,48 @@ +# r2r_tracing + +Internal dependency containing tracepoint definitions or imports from `tracetools` ROS package for r2r. + +Uses LTTng tracing framework. + +## Feature flag `tracing` + +The crate will generate and link tracepoint libraries only when the feature flag is enabled. + +Without specifying the feature flag `tracing` all exported functions are No-ops. + +## Depends on + +- `tracetools` ROS package + - This package is a part of ROS distribution. + - `r2r_tracing` dynamically loads `tracetools` library to obtain tracepoints used by `rclcpp`. +- `lttng-ust` crate + - To define additional tracepoints. + +## Recording traces of R2R applications + +Make sure to enable feature flag `tracing`. + +Then start tracing session: + +- Either by installing [`ros2trace`](https://index.ros.org/p/ros2trace/) and running: + + ```sh + ros2 trace -u 'ros2:*' 'r2r:*' + ``` + + The traces will be available in `$HOME/.ros/tracing/session-`. + +- Alternatively, you can trace your application directly with LTTng: + + ```sh + # Session name is an optional user-chosen name for the trace + lttng create [session-name] + lttng enable-event -u 'ros2:*,r2r:*' + lttng add-context -u --type=vtid --type=vpid --type=procname + lttng start + # Start the ROS system here. + # Let it run for as long as you want to trace it. + lttng destroy + ``` + + The traces will be available in `$HOME/lttng-traces/-`. diff --git a/r2r_tracing/build.rs b/r2r_tracing/build.rs new file mode 100644 index 000000000..2adef2ae4 --- /dev/null +++ b/r2r_tracing/build.rs @@ -0,0 +1,76 @@ +#[cfg(all(feature = "tracing", not(target_os = "linux")))] +compile_error!("Feature 'tracing' is only available on Linux."); + +#[cfg(not(feature = "tracing"))] +fn main() {} + +#[cfg(feature = "tracing")] +fn main() { + use tracing::{generate_r2r_tracepoints, generate_rclcpp_tracepoint_bindings}; + + generate_rclcpp_tracepoint_bindings(); + generate_r2r_tracepoints(); +} + +#[cfg(feature = "tracing")] +mod tracing { + use lttng_ust_generate::{CIntegerType, CTFType, Generator, Provider}; + use std::{env, path::PathBuf}; + + macro_rules! create_tracepoint { + ($provider:ident::$name:ident($($arg_name:ident: $arg_lttng_type:expr),* $(,)?)) => { + $provider.create_class(concat!(stringify!($name), "_class")) + $( + .add_field(stringify!($arg_name), $arg_lttng_type) + )* + .instantiate(stringify!($name)) + }; + } + + /// Generate bindings to the rclcpp tracepoints defined in the tracetools ros2 package. + pub(crate) fn generate_rclcpp_tracepoint_bindings() { + let bindings = r2r_common::setup_bindgen_builder() + .header("src/tracetools_wrapper.h") + .allowlist_function("ros_trace_rclcpp_.*") + .allowlist_function("ros_trace_callback_.*") + .generate_comments(true) + .generate() + .expect("Unable to generate bindings for tracetools"); + + println!("cargo:rustc-link-lib=tracetools"); + + // Write the bindings to the $OUT_DIR/tracetools_bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("tracetools_bindings.rs")) + .expect("Couldn't write tracetools bindings!"); + } + + pub(crate) fn generate_r2r_tracepoints() { + let mut r2r = Provider::new("r2r"); + + create_tracepoint!(r2r::spin_start( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + timeout_s: CTFType::Integer(CIntegerType::U64), + timeout_ns: CTFType::Integer(CIntegerType::U32), + )); + create_tracepoint!(r2r::spin_end( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + create_tracepoint!(r2r::spin_wake( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + create_tracepoint!(r2r::spin_timeout( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + + Generator::default() + .generated_lib_name("r2r_tracepoints_r2r") + .register_provider(r2r) + .output_file_name( + PathBuf::from(env::var("OUT_DIR").unwrap()).join("r2r_tracepoints.rs"), + ) + .generate() + .expect("Unable to generate tracepoint bindings for r2r"); + } +} diff --git a/r2r_tracing/src/callback.rs b/r2r_tracing/src/callback.rs new file mode 100644 index 000000000..40082ac10 --- /dev/null +++ b/r2r_tracing/src/callback.rs @@ -0,0 +1,81 @@ +use crate::{ + trace_callback_register, trace_service_callback_added, trace_subscription_callback_added, + trace_timer_callback_added, TracingId, +}; +use r2r_rcl::{rcl_service_t, rcl_timer_t}; +use std::{ + any::type_name, + ffi::c_void, + marker::PhantomData, + sync::atomic::{AtomicUsize, Ordering::Relaxed}, +}; + +/// Tracing wrapper for callback +pub struct Callback +where + F: FnMut(M), +{ + func: F, + id: usize, + msg_type: PhantomData, +} + +impl Callback +where + F: FnMut(M), +{ + /// Generates unique ID for the callback + fn gen_id() -> usize { + static COUNTER: AtomicUsize = AtomicUsize::new(1); + COUNTER.fetch_add(1, Relaxed) + } + + fn new(callback: F, id: usize) -> Self { + trace_callback_register(id, type_name::()); + + Self { + func: callback, + id, + msg_type: PhantomData, + } + } + + /// Emits trace event associating this `callback` with the `service`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_service(service: TracingId, callback: F) -> Self { + let id = Self::gen_id(); + trace_service_callback_added(service, id); + + Self::new(callback, id) + } + + /// Emits trace event associating this `callback` with the `timer`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_timer(timer: TracingId, callback: F) -> Self { + let id = Self::gen_id(); + trace_timer_callback_added(timer, id); + + Self::new(callback, id) + } + + /// Emits trace event associating this `callback` with the `subscription`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_subscription(subscriber: TracingId, callback: F) -> Self { + let id = Self::gen_id(); + trace_subscription_callback_added(subscriber, id); + + Self::new(callback, id) + } + + /// Call the `callback`. + /// This emits `ros2:callback_start` and `ros2:callback_end` events at + /// the beginning and end respectively. + pub fn call(&mut self, msg: M) { + crate::trace_callback_start(self.id, false); + (self.func)(msg); + crate::trace_callback_end(self.id); + } +} diff --git a/r2r_tracing/src/lib.rs b/r2r_tracing/src/lib.rs new file mode 100644 index 000000000..7d52566f3 --- /dev/null +++ b/r2r_tracing/src/lib.rs @@ -0,0 +1,25 @@ +//! Tracepoint provider for the `r2r` crate. + +#[cfg(feature = "tracing")] +mod r2r_tracepoints_bindings; + +#[cfg(feature = "tracing")] +mod tracetools_bindings; + +mod rclcpp_tracepoints; +pub use rclcpp_tracepoints::*; + +mod r2r_tracepoints; +pub use r2r_tracepoints::*; + +mod callback; +pub use callback::Callback; + +mod stream; +pub use stream::{StreamWithTracingData, StreamWithTracingDataBuilder}; + +mod tracing_id; +pub use tracing_id::TracingId; + +mod macros; +use macros::tracepoint_fn; diff --git a/r2r_tracing/src/macros.rs b/r2r_tracing/src/macros.rs new file mode 100644 index 000000000..8729744f3 --- /dev/null +++ b/r2r_tracing/src/macros.rs @@ -0,0 +1,26 @@ +/// Generates two versions of each function definition wrapped via this macro: +/// - One with tracing enabled: Adds a feature flag and copies the function signature and body as is. +/// - One with tracing disabled: Adds a feature flag and replaces the function body to be empty. +macro_rules! tracepoint_fn { + ($($(#[$atts:meta])* $vi:vis fn $name:ident$(<$generic:tt>)?($($arg_name:ident : $arg_ty:ty),* $(,)?) $(-> $ret_ty:ty)? $body:block)*) => { + $( + $(#[$atts])* + #[inline] + #[cfg(feature = "tracing")] + $vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? $body + + $(#[$atts])* + #[doc = ""] // Empty doc line to start a new paragraph. + #[doc = "Tracing is currently disabled so this function is a no-op. Enable `tracing` feature to enable tracing."] + #[inline(always)] + #[allow(unused_variables)] + #[cfg(not(feature = "tracing"))] + $vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? { + // Do nothing + } + )* + }; +} + +// set macro visibility to public in crate only +pub(crate) use tracepoint_fn; diff --git a/r2r_tracing/src/r2r_tracepoints.rs b/r2r_tracing/src/r2r_tracepoints.rs new file mode 100644 index 000000000..027468d41 --- /dev/null +++ b/r2r_tracing/src/r2r_tracepoints.rs @@ -0,0 +1,32 @@ +#[cfg(feature = "tracing")] +use crate::r2r_tracepoints_bindings::r2r as tp; +use crate::tracepoint_fn; +use r2r_rcl::rcl_node_t; +use std::time::Duration; + +tracepoint_fn! { +/// The `node` started spinning with given `timeout`. +pub fn trace_spin_start(node: *const rcl_node_t, timeout: Duration) { + let timeout_s = timeout.as_secs(); + let timeout_ns = timeout.subsec_nanos(); + + tp::spin_start(node as usize, timeout_s, timeout_ns); +} + +/// The `node` ended spinning function. +/// +/// If the spinning function ended by a timeout, use `trace_spin_timeout` instead. +pub fn trace_spin_end(node: *const rcl_node_t) { + tp::spin_end(node as usize); +} + +/// The `node` woke up from waiting on a wait set without reaching timeout. +pub fn trace_spin_wake(node: *const rcl_node_t) { + tp::spin_wake(node as usize); +} + +/// The `node` timeouted while spinning +pub fn trace_spin_timeout(node: *const rcl_node_t) { + tp::spin_timeout(node as usize); +} +} diff --git a/r2r_tracing/src/r2r_tracepoints_bindings.rs b/r2r_tracing/src/r2r_tracepoints_bindings.rs new file mode 100644 index 000000000..7f23d6c95 --- /dev/null +++ b/r2r_tracing/src/r2r_tracepoints_bindings.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/r2r_tracepoints.rs")); diff --git a/r2r_tracing/src/rclcpp_tracepoints.rs b/r2r_tracing/src/rclcpp_tracepoints.rs new file mode 100644 index 000000000..655276d77 --- /dev/null +++ b/r2r_tracing/src/rclcpp_tracepoints.rs @@ -0,0 +1,161 @@ +use crate::{tracepoint_fn, TracingId}; +use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t}; + +#[cfg(feature = "tracing")] +use crate::tracetools_bindings as tp; +#[cfg(feature = "tracing")] +use std::{ + ffi::{c_void, CString}, + ptr::null, +}; + +#[cfg(feature = "tracing")] +const fn ref_to_c_void(t: &T) -> *const std::ffi::c_void { + std::ptr::from_ref(t).cast() +} + +#[cfg(feature = "tracing")] +macro_rules! c_void { + ($e:ident) => { + ($e) as *const std::ffi::c_void + }; +} + +// Documentation of tracepoints is based on https://github.com/ros2/ros2_tracing project. +// From file `tracetools/include/tracetools/tracetools.h` + +tracepoint_fn! { +/// Message publication. +/// +/// Records the pointer to the `message` being published at the `rclcpp`/`r2r` level. +/// +/// Tracepoint: `ros2::rclcpp_publish +// Lint allow note: The message pointer is NOT dereferenced. +#[allow(clippy::not_unsafe_ptr_arg_deref)] +pub fn trace_publish(message: *const std::ffi::c_void) { + unsafe { + // first argument documentation: + // publisher_handle not used, but kept for API/ABI stability + tp::ros_trace_rclcpp_publish(null(), message); + } +} + +/// Subscription object initialization. +/// +/// Tracepoint to allow associating the `subscription_handle` from `rcl` with the address of rust `subscription` reference. +/// There can be more than 1 `subscription` for 1 `subscription_handle`. +/// +/// Tracepoint: `ros2::rclcpp_subscription_init` +pub fn trace_subscription_init( + subscription_handle: *const rcl_subscription_t, subscription: &S, +) { + unsafe { + tp::ros_trace_rclcpp_subscription_init( + subscription_handle.cast(), + ref_to_c_void(subscription), + ); + } +} + +/// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object. +/// +/// Tracepoint: `ros2::rclcpp_subscription_callback_added` +pub fn trace_subscription_callback_added(subscription: TracingId, callback_id: usize) { + unsafe { + tp::ros_trace_rclcpp_subscription_callback_added( + subscription.c_void(), + c_void!(callback_id), + ); + } +} + +/// Message taking. +/// +/// Records the **reference** to the `message` being taken at the `rclcpp`/`r2r` level. +/// +/// To trace messages pointed to by void pointer use [`trace_take_ptr`]. +/// +/// Tracepoint: `ros2::rclcpp_take` +pub fn trace_take(message: &M) { + unsafe { + tp::ros_trace_rclcpp_take(ref_to_c_void(message)); + } +} + +/// Message taking. +/// +/// Records the **void pointer** to the `message` being taken at the `rclcpp`/`r2r` level. +/// +/// To trace messages by their reference use [`trace_take`]. +/// +/// Tracepoint: `ros2::rclcpp_take` +// Lint allow note: The message pointer is NOT dereferenced. +#[allow(clippy::not_unsafe_ptr_arg_deref)] +pub fn trace_take_ptr(message: *const std::ffi::c_void) { + unsafe { + tp::ros_trace_rclcpp_take(message); + } +} + +/// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`. +/// +/// Tracepoint: `ros2::rclcpp_service_callback_added` +pub fn trace_service_callback_added(service: TracingId, callback_id: usize) { + unsafe { tp::ros_trace_rclcpp_service_callback_added(service.c_void(), c_void!(callback_id)) } +} + +/// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle. +/// +/// Tracepoint: `ros2::rclcpp_timer_callback_added` +pub fn trace_timer_callback_added(timer: TracingId, callback_id: usize) { + unsafe { + tp::ros_trace_rclcpp_timer_callback_added(timer.c_void(), c_void!(callback_id)); + } +} + +/// Tracepoint to allow associating the `timer` with a `node`. +/// +/// Tracepoint: `ros2::rclcpp_timer_link_node` +pub fn trace_timer_link_node(timer: TracingId, node: TracingId) { + unsafe { + tp::ros_trace_rclcpp_timer_link_node(timer.c_void(), node.c_void()); + } +} + +/// Tracepoint to allow associating demangled `function_symbol` with a `callback_id`. +/// +/// Allocates memory to store the function symbol as a `CString`. +/// +/// Tracepoint: `ros2::callback_register` +/// +/// # Panics +/// If `function_symbol` contains a null byte. +pub fn trace_callback_register(callback_id: usize, function_symbol: &str) { + let function_symbol = CString::new(function_symbol) + .expect("r2r tracing: Cannot convert function_symbol to CString. It contains null byte."); + + unsafe { + tp::ros_trace_rclcpp_callback_register(c_void!(callback_id), function_symbol.as_ptr()); + } +} + +/// Start of a callback +/// +/// Set `is_intra_process` depending on whether this callback is done via intra-process or not +/// +/// Tracepoint: `ros2::callback_start` +pub fn trace_callback_start(callback_id: usize, is_intra_process: bool) { + unsafe { + tp::ros_trace_callback_start(c_void!(callback_id), is_intra_process); + } +} + +/// End of a callback. +/// +/// Tracepoint: `ros2::callback_end` +pub fn trace_callback_end(callback_id: usize) { + unsafe { + tp::ros_trace_callback_end(c_void!(callback_id)); + } +} +} diff --git a/r2r_tracing/src/stream.rs b/r2r_tracing/src/stream.rs new file mode 100644 index 000000000..fc2189962 --- /dev/null +++ b/r2r_tracing/src/stream.rs @@ -0,0 +1,136 @@ +use std::{ + ffi::c_void, + future::{self, Future}, +}; + +use futures::{channel::mpsc::Receiver, stream::FusedStream, Stream, StreamExt as _}; +use r2r_rcl::rcl_service_t; + +#[cfg(feature = "tracing")] +use crate::Callback; +use crate::TracingId; + +/// A stream wrapper containing tracing data. +/// +/// When the `tracing` feature is enabled, you can use the [`Self::traced_callback`] method +/// to trace the execution of the callback. Stream polling is not traced. +#[derive(Debug)] +pub struct StreamWithTracingData { + stream: Receiver, + + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType, +} + +#[cfg(feature = "tracing")] +#[derive(Debug, Clone, Copy)] +enum TracingIdWithType { + // The type of subscription is `c_void` because the actual type would + // be a generic R2R subscriber and not `rcl_subscription_t`. + Subscription(TracingId), + Service(TracingId), +} + +impl StreamWithTracingData { + /// Converts the stream into a future that calls the provided callback for each message. + /// + /// If `tracing` feature is enabled: + /// - Each time the callback is called, its start and end time will be traced by the + /// ROS 2 tracing framework. + /// - You should not poll the stream before calling this method because + /// otherwise it might confuse software that analyzes the trace. + pub fn traced_callback(self, callback: C) -> impl Future + Unpin + where + C: FnMut(T), + { + #[cfg(feature = "tracing")] + { + let mut callback_wrapper = match self.tracing_id { + TracingIdWithType::Subscription(id) => Callback::new_subscription(id, callback), + TracingIdWithType::Service(id) => Callback::new_service(id, callback), + }; + + self.stream.for_each(move |msg| { + callback_wrapper.call(msg); + future::ready(()) + }) + } + #[cfg(not(feature = "tracing"))] + { + let mut callback = callback; + self.stream.for_each(move |msg| { + callback(msg); + future::ready(()) + }) + } + } +} + +impl Stream for StreamWithTracingData { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + this.stream.poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl FusedStream for StreamWithTracingData { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +/// A builder for `StreamWithTracingData`. +/// +/// This struct exists to allow creation of `StreamWithTracingData` without polluting its public API. +/// It is used internally by r2r and should not be reexported. +pub struct StreamWithTracingDataBuilder; + +impl StreamWithTracingDataBuilder { + #[must_use] + #[allow( + clippy::not_unsafe_ptr_arg_deref, + reason = "The pointer is not dereferenced" + )] + #[cfg_attr( + not(feature = "tracing"), + expect( + unused_variables, + reason = "service_id is not saved if tracing is disabled" + ) + )] + pub fn build_service( + stream: Receiver, service_id: TracingId, + ) -> StreamWithTracingData { + StreamWithTracingData { + stream, + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType::Service(service_id), + } + } + + #[must_use] + #[cfg_attr( + not(feature = "tracing"), + expect( + unused_variables, + reason = "subscription_id is not saved if tracing is disabled" + ) + )] + pub fn build_subscription( + stream: Receiver, subscription_id: TracingId, + ) -> StreamWithTracingData { + StreamWithTracingData { + stream, + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType::Subscription(subscription_id), + } + } +} diff --git a/r2r_tracing/src/tracetools_bindings.rs b/r2r_tracing/src/tracetools_bindings.rs new file mode 100644 index 000000000..dc33f2641 --- /dev/null +++ b/r2r_tracing/src/tracetools_bindings.rs @@ -0,0 +1,6 @@ +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(unused)] + +include!(concat!(env!("OUT_DIR"), "/tracetools_bindings.rs")); diff --git a/r2r_tracing/src/tracetools_wrapper.h b/r2r_tracing/src/tracetools_wrapper.h new file mode 100644 index 000000000..f50256eae --- /dev/null +++ b/r2r_tracing/src/tracetools_wrapper.h @@ -0,0 +1 @@ +#include diff --git a/r2r_tracing/src/tracing_id.rs b/r2r_tracing/src/tracing_id.rs new file mode 100644 index 000000000..950f4dbf9 --- /dev/null +++ b/r2r_tracing/src/tracing_id.rs @@ -0,0 +1,77 @@ +/// Unique identifier for tracing purposes +#[derive(Debug)] +pub struct TracingId { + /// Pointer to the object used as a unique ID. + /// Safety: Do NOT dereference the pointer. + #[cfg(feature = "tracing")] + id: *const T, + + /// Marker for the type. Needed when `tracing` feature is disabled. + #[cfg(not(feature = "tracing"))] + _marker: std::marker::PhantomData, +} + +impl TracingId { + /// Creates new `TracingId` from the pointer. + /// + /// # Safety + /// The pointer is used as a unique ID so users must ensure that they never create `TracingId` + /// with same address for different objects. + /// + /// The pointer does not need to point to valid memory. + pub const unsafe fn new(_id: *const T) -> Self { + Self { + #[cfg(feature = "tracing")] + id: _id, + #[cfg(not(feature = "tracing"))] + _marker: std::marker::PhantomData, + } + } + + /// Erase the generic type of the ID. + #[must_use] + pub fn forget_type(self) -> TracingId { + #[cfg(not(feature = "tracing"))] + unsafe { + // Safety: The ID cannot be obtained back without the `tracing` feature. + TracingId::new(std::ptr::null()) + } + #[cfg(feature = "tracing")] + unsafe { + // Safety: self contains valid ID. + TracingId::new(self.c_void()) + } + } + + /// Obtain the address representing the ID. + /// + /// # Safety + /// Do NOT dereference the pointer. + #[cfg(feature = "tracing")] + pub(crate) const unsafe fn c_void(self) -> *const std::ffi::c_void { + self.id.cast::() + } +} + +/// Deriving Clone for `TracingId` would only derive it only conditionally based on whether the +/// `T` is `Clone` or not. But TracingId is independent of T. +impl Clone for TracingId { + fn clone(&self) -> Self { + *self + } +} + +/// Deriving Copy for `TracingId` would only derive it only conditionally based on whether the +/// `T` is `Copy` or not. But TracingId is independent of T. +impl Copy for TracingId {} + +/// # Safety +/// +/// The address is never dereferenced and is used only as a unique ID so it is safe to send to another thread. +unsafe impl Send for TracingId {} + +/// # Safety +/// +/// The `TracingId` does not allow interior mutability because the pointer is never dereferenced. +/// It is safe to use across multiple threads. +unsafe impl Sync for TracingId {}