diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp index 7d8ddf96..78e81e51 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -34,11 +35,12 @@ namespace ros2_medkit_gateway::ros2 { /** * @brief rclcpp adapter implementing FaultServiceTransport. * - * Owns the seven `rclcpp::Client` instances and the - * seven per-client mutexes that previously lived inside FaultManager. Each - * mutex serialises one service direction so read operations (list, get) are - * not blocked by slow write operations (report_fault triggers snapshot - * capture inside the fault manager node). + * Owns seven `rclcpp::Client` instances created on a + * private `rclcpp::Node` plus a private `SingleThreadedExecutor` that drives + * that node. Each RPC blocks on `executor_->spin_until_future_complete()`, so + * the client's pending-request cleanup and the response destruction both run + * on the calling thread instead of racing with whatever executor spins the + * host gateway node. * * Performs the ros2_medkit_msgs <-> JSON translation internally, returning * neutral FaultResult and FaultWithEnvJsonResult structures so the FaultManager @@ -47,8 +49,10 @@ namespace ros2_medkit_gateway::ros2 { class Ros2FaultServiceTransport : public FaultServiceTransport { public: /** - * @param node Non-owning ROS node used for client creation and to derive - * the configured fault_manager namespace. + * @param node Non-owning ROS node used to read the service timeout + * parameter and to derive the configured fault_manager + * namespace. The service clients themselves live on a private + * node owned by this transport. */ explicit Ros2FaultServiceTransport(rclcpp::Node * node); @@ -85,6 +89,15 @@ class Ros2FaultServiceTransport : public FaultServiceTransport { private: rclcpp::Node * node_; + /// Private node hosting the fault service clients. Kept off the host node's + /// executor so the gateway's MultiThreadedExecutor never processes these + /// clients' responses concurrently with our synchronous spin. + std::shared_ptr client_node_; + + /// Private single-threaded executor that drives `client_node_`, spun inline + /// via `spin_until_future_complete()` on each RPC. + std::shared_ptr executor_; + rclcpp::Client::SharedPtr report_fault_client_; rclcpp::Client::SharedPtr get_fault_client_; rclcpp::Client::SharedPtr list_faults_client_; @@ -96,16 +109,11 @@ class Ros2FaultServiceTransport : public FaultServiceTransport { double service_timeout_sec_{5.0}; std::string fault_manager_base_path_{"/fault_manager"}; - /// Per-client mutexes for thread-safe service calls. - /// Split by service client so that read operations (list, get) are not blocked - /// by slow write operations (report_fault with snapshot capture). - mutable std::mutex report_mutex_; - mutable std::mutex list_mutex_; - mutable std::mutex get_mutex_; - mutable std::mutex clear_mutex_; - mutable std::mutex snapshots_mutex_; - mutable std::mutex rosbag_mutex_; - mutable std::mutex list_rosbags_mutex_; + /// Serialises access to `executor_`. `SingleThreadedExecutor::spin_until_future_complete()` + /// is not safe for concurrent callers on the same executor, so all RPCs take + /// this mutex for the duration of the synchronous spin. Replaces the seven + /// per-client mutexes that previously gated each method. + mutable std::mutex executor_mutex_; }; } // namespace ros2_medkit_gateway::ros2 diff --git a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp index 1e531d63..accd57cd 100644 --- a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp +++ b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include "ros2_medkit_gateway/fault_manager_paths.hpp" @@ -35,26 +34,40 @@ Ros2FaultServiceTransport::Ros2FaultServiceTransport(rclcpp::Node * node) : node } fault_manager_base_path_ = build_fault_manager_base_path(node_); + // The fault service clients live on a private node driven by a private + // executor. The host gateway node's MultiThreadedExecutor therefore never + // processes these clients, so the client's pending-request cleanup cannot + // race against the calling thread destroying the response shared_ptr - both + // happen inline on the caller's thread inside spin_until_future_complete(). + client_node_ = std::make_shared(std::string(node_->get_name()) + "_fault_clients"); + executor_ = std::make_shared(); + executor_->add_node(client_node_); + report_fault_client_ = - node_->create_client(fault_manager_base_path_ + "/report_fault"); - get_fault_client_ = node_->create_client(fault_manager_base_path_ + "/get_fault"); + client_node_->create_client(fault_manager_base_path_ + "/report_fault"); + get_fault_client_ = + client_node_->create_client(fault_manager_base_path_ + "/get_fault"); list_faults_client_ = - node_->create_client(fault_manager_base_path_ + "/list_faults"); + client_node_->create_client(fault_manager_base_path_ + "/list_faults"); clear_fault_client_ = - node_->create_client(fault_manager_base_path_ + "/clear_fault"); + client_node_->create_client(fault_manager_base_path_ + "/clear_fault"); get_snapshots_client_ = - node_->create_client(fault_manager_base_path_ + "/get_snapshots"); - get_rosbag_client_ = node_->create_client(fault_manager_base_path_ + "/get_rosbag"); + client_node_->create_client(fault_manager_base_path_ + "/get_snapshots"); + get_rosbag_client_ = + client_node_->create_client(fault_manager_base_path_ + "/get_rosbag"); list_rosbags_client_ = - node_->create_client(fault_manager_base_path_ + "/list_rosbags"); + client_node_->create_client(fault_manager_base_path_ + "/list_rosbags"); RCLCPP_INFO(node_->get_logger(), "Ros2FaultServiceTransport initialized (base_path=%s, timeout=%.1fs)", fault_manager_base_path_.c_str(), service_timeout_sec_); } Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { - // Reset clients before implicit member destruction so any in-flight - // future-callback paths drop their references in a defined order. + // Tear down under the executor mutex so any in-flight RPC has finished. Drop + // the clients first, then the executor, then the node, so the executor never + // references a freed node. + std::lock_guard lock(executor_mutex_); + report_fault_client_.reset(); get_fault_client_.reset(); list_faults_client_.reset(); @@ -62,6 +75,9 @@ Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { get_snapshots_client_.reset(); get_rosbag_client_.reset(); list_rosbags_client_.reset(); + + executor_.reset(); + client_node_.reset(); } bool Ros2FaultServiceTransport::wait_for_services(std::chrono::duration timeout) { @@ -76,15 +92,8 @@ bool Ros2FaultServiceTransport::is_available() const { FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_code, uint8_t severity, const std::string & description, const std::string & source_id) { - std::lock_guard lock(report_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!report_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ReportFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; @@ -93,15 +102,23 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co request->description = description; request->source_id = source_id; - auto future = report_fault_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ReportFault service call timed out"; - return result; + ros2_medkit_msgs::srv::ReportFault::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!report_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ReportFault service not available"; + return result; + } + auto future = report_fault_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ReportFault service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->accepted; result.data = {{"accepted", response->accepted}}; if (!response->accepted) { @@ -114,15 +131,8 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id, bool include_prefailed, bool include_confirmed, bool include_cleared, bool include_healed, bool include_muted, bool include_clusters) { - std::lock_guard lock(list_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!list_faults_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ListFaults service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->filter_by_severity = false; @@ -145,16 +155,23 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id request->include_muted = include_muted; request->include_clusters = include_clusters; - auto future = list_faults_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ListFaults service call timed out"; - return result; + ros2_medkit_msgs::srv::ListFaults::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!list_faults_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ListFaults service not available"; + return result; + } + auto future = list_faults_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ListFaults service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); - // Filter by source_id if provided (uses prefix matching) json faults_array = json::array(); for (const auto & fault : response->faults) { @@ -216,28 +233,29 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::string & fault_code, const std::string & source_id) { - std::lock_guard lock(get_mutex_); FaultWithEnvJsonResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - auto future = get_fault_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetFault service call timed out"; - return result; + ros2_medkit_msgs::srv::GetFault::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!get_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetFault service not available"; + return result; + } + auto future = get_fault_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetFault service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); if (!response->success) { result.success = false; result.error_message = response->error_message; @@ -282,28 +300,29 @@ FaultResult Ros2FaultServiceTransport::get_fault(const std::string & fault_code, } FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_code) { - std::lock_guard lock(clear_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!clear_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ClearFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - auto future = clear_fault_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ClearFault service call timed out"; - return result; + ros2_medkit_msgs::srv::ClearFault::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!clear_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ClearFault service not available"; + return result; + } + auto future = clear_fault_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ClearFault service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->success; result.data = {{"success", response->success}, {"message", response->message}}; if (!response->success) { @@ -318,29 +337,30 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod } FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_code, const std::string & topic) { - std::lock_guard lock(snapshots_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_snapshots_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetSnapshots service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; request->topic = topic; - auto future = get_snapshots_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetSnapshots service call timed out"; - return result; + ros2_medkit_msgs::srv::GetSnapshots::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!get_snapshots_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetSnapshots service not available"; + return result; + } + auto future = get_snapshots_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetSnapshots service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { @@ -361,28 +381,29 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c } FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code) { - std::lock_guard lock(rosbag_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_rosbag_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetRosbag service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - auto future = get_rosbag_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetRosbag service call timed out"; - return result; + ros2_medkit_msgs::srv::GetRosbag::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!get_rosbag_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetRosbag service not available"; + return result; + } + auto future = get_rosbag_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetRosbag service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { @@ -398,28 +419,29 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code } FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_fqn) { - std::lock_guard lock(list_rosbags_mutex_); FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!list_rosbags_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ListRosbags service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->entity_fqn = entity_fqn; - auto future = list_rosbags_client_->async_send_request(request); - - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ListRosbags service call timed out"; - return result; + ros2_medkit_msgs::srv::ListRosbags::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!list_rosbags_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ListRosbags service not available"; + return result; + } + auto future = list_rosbags_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ListRosbags service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { diff --git a/src/ros2_medkit_gateway/test/test_fault_manager.cpp b/src/ros2_medkit_gateway/test/test_fault_manager.cpp index 99569ae1..1631e54b 100644 --- a/src/ros2_medkit_gateway/test/test_fault_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_fault_manager.cpp @@ -168,6 +168,40 @@ TEST_F(FaultManagerTest, GetSnapshotsSuccessWithValidJson) { EXPECT_TRUE(result.data["topics"].contains("/joint_states")); } +// Regression guard: Ros2FaultServiceTransport must drive its service clients +// on its own private executor. The transport's host node (node_) is left +// unspun here - the mock service runs on a separate node with its own spin +// thread - so the RPC can only complete if the transport spins its private +// client node itself. +TEST_F(FaultManagerTest, GetSnapshotsDrivesPrivateClientWithoutSpinningHostNode) { + auto service_node = std::make_shared("mock_fault_service_node_" + std::to_string(test_counter_++)); + auto service = service_node->create_service( + "/fault_manager/get_snapshots", [](const std::shared_ptr & request, + const std::shared_ptr & response) { + response->success = true; + nlohmann::json snapshot_data; + snapshot_data["fault_code"] = request->fault_code; + response->data = snapshot_data.dump(); + }); + + rclcpp::executors::SingleThreadedExecutor service_executor; + service_executor.add_node(service_node); + std::thread service_thread([&service_executor]() { + service_executor.spin(); + }); + + // node_ is deliberately never spun by this test. + FaultManager fault_manager(std::make_shared(node_.get())); + auto result = fault_manager.get_snapshots("SELF_DRIVEN_FAULT"); + + service_executor.cancel(); + service_thread.join(); + + EXPECT_TRUE(result.success); + EXPECT_TRUE(result.error_message.empty()); + EXPECT_EQ(result.data["fault_code"], "SELF_DRIVEN_FAULT"); +} + // @verifies REQ_INTEROP_088 TEST_F(FaultManagerTest, GetSnapshotsUsesConfiguredFaultManagerNamespace) { node_ = std::make_shared(