diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index c797f1df6..6d46957d6 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -283,9 +283,13 @@ impl Node { .register_parameters("", None, &mut self.params.lock().unwrap())?; } let mut handlers: Vec + Send>>> = Vec::new(); - let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); + + let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); + let mut set_atomically_event_tx = set_event_tx.clone(); let node_name = self.name()?; + + // rcl_interfaces/srv/SetParameters let set_params_request_stream = self .create_service::( &format!("{}/set_parameters", node_name), @@ -335,7 +339,7 @@ impl Node { }; // if the value changed, send out new value on parameter event stream if changed && r.successful { - if let Err(e) = event_tx.try_send((p.name.clone(), val)) { + if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { log::debug!("Warning: could not send parameter event ({}).", e); } } @@ -449,6 +453,77 @@ impl Node { handlers.push(Box::pin(get_param_types_future)); + // rcl_interfaces/srv/SetParametersAtomically + + // NOTE: This is not a proper implementation of the specs, but rather a copy of set_parameters. + // On error, some of the parameters might already be set. + let set_params_atomically_request_stream = + self.create_service::( + &format!("{}/set_parameters_atomically", node_name), + QosProfile::default(), + )?; + + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let set_params_atomically_future = set_params_atomically_request_stream.for_each( + move |req: ServiceRequest| { + let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default(); + result.result.successful = true; + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) { + log::debug!("Warning: could not send parameter event ({}).", e); + } + } + // if this parameter failed, set the result and break + if !r.successful { + result.result = r; + break; + } + } + req.respond(result) + .expect("could not send reply to set parameter request"); + future::ready(()) + }, + ); + handlers.push(Box::pin(set_params_atomically_future)); + #[cfg(r2r__rosgraph_msgs__msg__Clock)] { // create TimeSource based on value of use_sim_time parameter @@ -909,7 +984,9 @@ impl Node { pub fn destroy_publisher(&mut self, p: Publisher) { if let Some(handle) = p.handle.upgrade() { // Remove handle from list of publishers. - self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle)) + self.pubs + .iter() + .position(|p| Arc::ptr_eq(p, &handle)) .map(|i| self.pubs.swap_remove(i)); let handle = wait_until_unwrapped(handle); @@ -921,7 +998,9 @@ impl Node { pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) { if let Some(handle) = p.handle.upgrade() { // Remove handle from list of publishers. - self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle)) + self.pubs + .iter() + .position(|p| Arc::ptr_eq(p, &handle)) .map(|i| self.pubs.swap_remove(i)); let handle = wait_until_unwrapped(handle);