Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/check_no_naked_subscriptions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ALLOWED_LEGACY_FILES=(
"${FAULT_MANAGER_ROOT}/src/snapshot_capture.cpp" # uses LockedSubscriptionGuard (in-place serialisation)
"${FAULT_MANAGER_ROOT}/include/ros2_medkit_fault_manager/snapshot_capture.hpp" # comment references the guarded API
"${FAULT_MANAGER_ROOT}/src/rosbag_capture.cpp" # bag-recorder spawns its own node + executor, no shared rcl hash map
"${GATEWAY_ROOT}/src/ros2/transports/ros2_fault_service_transport.cpp" # issue #399: single-threaded callback group at construction for synchronous service-client spin
)
Comment thread
bburda marked this conversation as resolved.

LEGACY_PATTERN=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <string>
Expand All @@ -34,11 +35,12 @@ namespace ros2_medkit_gateway::ros2 {
/**
* @brief rclcpp adapter implementing FaultServiceTransport.
*
* Owns the seven `rclcpp::Client<ros2_medkit_msgs::srv::*>` instances and the
* seven per-client mutexes that previously lived inside FaultManager. Each
* mutex serialises one service direction so read operations (list, get) are
* not blocked by slow write operations (report_fault triggers snapshot
* capture inside the fault manager node).
* Owns seven `rclcpp::Client<ros2_medkit_msgs::srv::*>` instances scoped to a
* dedicated `MutuallyExclusive` callback group, plus a private
* `SingleThreadedExecutor` that drives that group. Each RPC blocks on
* `executor_.spin_until_future_complete()` so the client's pending-request
Comment thread
bburda marked this conversation as resolved.
Outdated
* cleanup and the response destruction happen on the calling thread instead
* of racing with whichever external executor spins the host node.
*
* Performs the ros2_medkit_msgs <-> JSON translation internally, returning
* neutral FaultResult and FaultWithEnvJsonResult structures so the FaultManager
Expand Down Expand Up @@ -85,6 +87,15 @@ class Ros2FaultServiceTransport : public FaultServiceTransport {
private:
rclcpp::Node * node_;

/// Dedicated callback group for all fault service clients. Created with
/// `automatically_add_to_executor_with_node = false` so the host node's
/// executor does not spin these clients - we drive them ourselves.
rclcpp::CallbackGroup::SharedPtr callback_group_;

/// Private single-threaded executor that owns `callback_group_` and is
/// driven inline via `spin_until_future_complete()` on each RPC.
std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> executor_;

rclcpp::Client<ros2_medkit_msgs::srv::ReportFault>::SharedPtr report_fault_client_;
rclcpp::Client<ros2_medkit_msgs::srv::GetFault>::SharedPtr get_fault_client_;
rclcpp::Client<ros2_medkit_msgs::srv::ListFaults>::SharedPtr list_faults_client_;
Expand All @@ -96,16 +107,11 @@ class Ros2FaultServiceTransport : public FaultServiceTransport {
double service_timeout_sec_{5.0};
std::string fault_manager_base_path_{"/fault_manager"};

/// Per-client mutexes for thread-safe service calls.
/// Split by service client so that read operations (list, get) are not blocked
/// by slow write operations (report_fault with snapshot capture).
mutable std::mutex report_mutex_;
mutable std::mutex list_mutex_;
mutable std::mutex get_mutex_;
mutable std::mutex clear_mutex_;
mutable std::mutex snapshots_mutex_;
mutable std::mutex rosbag_mutex_;
mutable std::mutex list_rosbags_mutex_;
/// Serialises access to `executor_`. `SingleThreadedExecutor::spin_until_future_complete()`
/// is not safe for concurrent callers on the same executor, so all RPCs take
/// this mutex for the duration of the synchronous spin. Replaces the seven
/// per-client mutexes that previously gated each method.
mutable std::mutex executor_mutex_;
};

} // namespace ros2_medkit_gateway::ros2
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <algorithm>
#include <builtin_interfaces/msg/time.hpp>
#include <chrono>
#include <future>
#include <string>

#include "ros2_medkit_gateway/fault_manager_paths.hpp"
Expand All @@ -35,33 +34,54 @@ Ros2FaultServiceTransport::Ros2FaultServiceTransport(rclcpp::Node * node) : node
}
fault_manager_base_path_ = build_fault_manager_base_path(node_);

report_fault_client_ =
node_->create_client<ros2_medkit_msgs::srv::ReportFault>(fault_manager_base_path_ + "/report_fault");
get_fault_client_ = node_->create_client<ros2_medkit_msgs::srv::GetFault>(fault_manager_base_path_ + "/get_fault");
list_faults_client_ =
node_->create_client<ros2_medkit_msgs::srv::ListFaults>(fault_manager_base_path_ + "/list_faults");
clear_fault_client_ =
node_->create_client<ros2_medkit_msgs::srv::ClearFault>(fault_manager_base_path_ + "/clear_fault");
get_snapshots_client_ =
node_->create_client<ros2_medkit_msgs::srv::GetSnapshots>(fault_manager_base_path_ + "/get_snapshots");
get_rosbag_client_ = node_->create_client<ros2_medkit_msgs::srv::GetRosbag>(fault_manager_base_path_ + "/get_rosbag");
list_rosbags_client_ =
node_->create_client<ros2_medkit_msgs::srv::ListRosbags>(fault_manager_base_path_ + "/list_rosbags");
// Dedicated callback group for all fault clients. `false` keeps it out of
// the host node's main executor so the production gateway's
// MultiThreadedExecutor cannot race with our synchronous spin on the
// pending-request cleanup path (see issue #399).
callback_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);

executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor_->add_callback_group(callback_group_, node_->get_node_base_interface());

// Portable across Humble/Jazzy/Rolling - `rclcpp::ServicesQoS` only exists from Iron onwards.
const auto service_qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_services_default));

report_fault_client_ = node_->create_client<ros2_medkit_msgs::srv::ReportFault>(
fault_manager_base_path_ + "/report_fault", service_qos, callback_group_);
get_fault_client_ = node_->create_client<ros2_medkit_msgs::srv::GetFault>(fault_manager_base_path_ + "/get_fault",
service_qos, callback_group_);
list_faults_client_ = node_->create_client<ros2_medkit_msgs::srv::ListFaults>(
fault_manager_base_path_ + "/list_faults", service_qos, callback_group_);
clear_fault_client_ = node_->create_client<ros2_medkit_msgs::srv::ClearFault>(
fault_manager_base_path_ + "/clear_fault", service_qos, callback_group_);
get_snapshots_client_ = node_->create_client<ros2_medkit_msgs::srv::GetSnapshots>(
fault_manager_base_path_ + "/get_snapshots", service_qos, callback_group_);
get_rosbag_client_ = node_->create_client<ros2_medkit_msgs::srv::GetRosbag>(fault_manager_base_path_ + "/get_rosbag",
service_qos, callback_group_);
list_rosbags_client_ = node_->create_client<ros2_medkit_msgs::srv::ListRosbags>(
fault_manager_base_path_ + "/list_rosbags", service_qos, callback_group_);

RCLCPP_INFO(node_->get_logger(), "Ros2FaultServiceTransport initialized (base_path=%s, timeout=%.1fs)",
fault_manager_base_path_.c_str(), service_timeout_sec_);
}

Ros2FaultServiceTransport::~Ros2FaultServiceTransport() {
// Reset clients before implicit member destruction so any in-flight
// future-callback paths drop their references in a defined order.
// Tear down under the executor mutex so any in-flight RPC sees a consistent
// view, then drop the executor before the clients/callback group so it can
// unregister cleanly while the underlying entities still exist.
std::lock_guard<std::mutex> lock(executor_mutex_);

executor_.reset();

report_fault_client_.reset();
get_fault_client_.reset();
list_faults_client_.reset();
clear_fault_client_.reset();
get_snapshots_client_.reset();
get_rosbag_client_.reset();
list_rosbags_client_.reset();

callback_group_.reset();
}

bool Ros2FaultServiceTransport::wait_for_services(std::chrono::duration<double> timeout) {
Expand All @@ -76,7 +96,6 @@ bool Ros2FaultServiceTransport::is_available() const {

FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_code, uint8_t severity,
const std::string & description, const std::string & source_id) {
std::lock_guard<std::mutex> lock(report_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -93,9 +112,10 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co
request->description = description;
request->source_id = source_id;

std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = report_fault_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "ReportFault service call timed out";
return result;
Comment thread
bburda marked this conversation as resolved.
Outdated
Expand All @@ -114,7 +134,6 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co
FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id, bool include_prefailed,
bool include_confirmed, bool include_cleared, bool include_healed,
bool include_muted, bool include_clusters) {
std::lock_guard<std::mutex> lock(list_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand Down Expand Up @@ -145,15 +164,19 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id
request->include_muted = include_muted;
request->include_clusters = include_clusters;

auto future = list_faults_client_->async_send_request(request);
std::shared_ptr<ros2_medkit_msgs::srv::ListFaults::Response> response;
{
std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = list_faults_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
result.success = false;
result.error_message = "ListFaults service call timed out";
return result;
}
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "ListFaults service call timed out";
return result;
}

auto response = future.get();
response = future.get();
}

// Filter by source_id if provided (uses prefix matching)
json faults_array = json::array();
Expand Down Expand Up @@ -216,7 +239,6 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id

FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::string & fault_code,
const std::string & source_id) {
std::lock_guard<std::mutex> lock(get_mutex_);
FaultWithEnvJsonResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -229,15 +251,20 @@ FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::
auto request = std::make_shared<ros2_medkit_msgs::srv::GetFault::Request>();
request->fault_code = fault_code;

auto future = get_fault_client_->async_send_request(request);
std::shared_ptr<ros2_medkit_msgs::srv::GetFault::Response> response;
{
std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = get_fault_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
result.success = false;
result.error_message = "GetFault service call timed out";
return result;
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "GetFault service call timed out";
return result;
}

response = future.get();
}

auto response = future.get();
if (!response->success) {
result.success = false;
result.error_message = response->error_message;
Expand Down Expand Up @@ -282,7 +309,6 @@ FaultResult Ros2FaultServiceTransport::get_fault(const std::string & fault_code,
}

FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_code) {
std::lock_guard<std::mutex> lock(clear_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -295,9 +321,10 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod
auto request = std::make_shared<ros2_medkit_msgs::srv::ClearFault::Request>();
request->fault_code = fault_code;

std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = clear_fault_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "ClearFault service call timed out";
return result;
Expand All @@ -318,7 +345,6 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod
}

FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_code, const std::string & topic) {
std::lock_guard<std::mutex> lock(snapshots_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -332,15 +358,20 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c
request->fault_code = fault_code;
request->topic = topic;

auto future = get_snapshots_client_->async_send_request(request);
std::shared_ptr<ros2_medkit_msgs::srv::GetSnapshots::Response> response;
{
std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = get_snapshots_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
result.success = false;
result.error_message = "GetSnapshots service call timed out";
return result;
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "GetSnapshots service call timed out";
return result;
}

response = future.get();
}

auto response = future.get();
result.success = response->success;

if (response->success) {
Expand All @@ -361,7 +392,6 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c
}

FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code) {
std::lock_guard<std::mutex> lock(rosbag_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -374,15 +404,20 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code
auto request = std::make_shared<ros2_medkit_msgs::srv::GetRosbag::Request>();
request->fault_code = fault_code;

auto future = get_rosbag_client_->async_send_request(request);
std::shared_ptr<ros2_medkit_msgs::srv::GetRosbag::Response> response;
{
std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = get_rosbag_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
result.success = false;
result.error_message = "GetRosbag service call timed out";
return result;
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "GetRosbag service call timed out";
return result;
}

response = future.get();
}

auto response = future.get();
result.success = response->success;

if (response->success) {
Expand All @@ -398,7 +433,6 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code
}

FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_fqn) {
std::lock_guard<std::mutex> lock(list_rosbags_mutex_);
FaultResult result;

auto timeout = std::chrono::duration<double>(service_timeout_sec_);
Expand All @@ -411,15 +445,20 @@ FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_f
auto request = std::make_shared<ros2_medkit_msgs::srv::ListRosbags::Request>();
request->entity_fqn = entity_fqn;

auto future = list_rosbags_client_->async_send_request(request);
std::shared_ptr<ros2_medkit_msgs::srv::ListRosbags::Response> response;
{
std::lock_guard<std::mutex> lock(executor_mutex_);
auto future = list_rosbags_client_->async_send_request(request);

if (future.wait_for(timeout) != std::future_status::ready) {
result.success = false;
result.error_message = "ListRosbags service call timed out";
return result;
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
result.success = false;
result.error_message = "ListRosbags service call timed out";
return result;
}

response = future.get();
}

auto response = future.get();
result.success = response->success;

if (response->success) {
Expand Down
Loading