Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,13 @@ impl Node {
.register_parameters("", None, &mut self.params.lock().unwrap())?;
}
let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);

let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
let mut set_atomically_event_tx = set_event_tx.clone();

let node_name = self.name()?;

// rcl_interfaces/srv/SetParameters
let set_params_request_stream = self
.create_service::<rcl_interfaces::srv::SetParameters::Service>(
&format!("{}/set_parameters", node_name),
Expand Down Expand Up @@ -335,7 +339,7 @@ impl Node {
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = event_tx.try_send((p.name.clone(), val)) {
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
Expand Down Expand Up @@ -449,6 +453,77 @@ impl Node {

handlers.push(Box::pin(get_param_types_future));

// rcl_interfaces/srv/SetParametersAtomically

// NOTE: This is not a proper implementation of the specs, but rather a copy of set_parameters.
// On error, some of the parameters might already be set.
let set_params_atomically_request_stream =
self.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
&format!("{}/set_parameters_atomically", node_name),
QosProfile::default(),
)?;

let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_atomically_future = set_params_atomically_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::SetParametersAtomically::Service>| {
let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default();
result.result.successful = true;
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
// if this parameter failed, set the result and break
if !r.successful {
result.result = r;
break;
}
}
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
handlers.push(Box::pin(set_params_atomically_future));

#[cfg(r2r__rosgraph_msgs__msg__Clock)]
{
// create TimeSource based on value of use_sim_time parameter
Expand Down Expand Up @@ -909,7 +984,9 @@ impl Node {
pub fn destroy_publisher<T: WrappedTypesupport>(&mut self, p: Publisher<T>) {
if let Some(handle) = p.handle.upgrade() {
// Remove handle from list of publishers.
self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
self.pubs
.iter()
.position(|p| Arc::ptr_eq(p, &handle))
.map(|i| self.pubs.swap_remove(i));

let handle = wait_until_unwrapped(handle);
Expand All @@ -921,7 +998,9 @@ impl Node {
pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) {
if let Some(handle) = p.handle.upgrade() {
// Remove handle from list of publishers.
self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
self.pubs
.iter()
.position(|p| Arc::ptr_eq(p, &handle))
.map(|i| self.pubs.swap_remove(i));

let handle = wait_until_unwrapped(handle);
Expand Down