diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..b63ea2c41b 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -69,6 +69,18 @@ class MultiThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(std::function exception_handler); + RCLCPP_PUBLIC size_t get_number_of_threads(); @@ -76,7 +88,7 @@ class MultiThreadedExecutor : public rclcpp::Executor protected: RCLCPP_PUBLIC void - run(size_t this_thread_number); + run(size_t this_thread_number, std::function exception_handler); private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..c3f025e99d 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() +{ + spin(std::function()); +} + +void + +MultiThreadedExecutor::spin(std::function exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -61,12 +68,12 @@ MultiThreadedExecutor::spin() { std::lock_guard wait_lock{wait_mutex_}; for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler); threads.emplace_back(func); } } - run(thread_id); + run(thread_id, exception_handler); for (auto & thread : threads) { thread.join(); } @@ -79,28 +86,48 @@ MultiThreadedExecutor::get_number_of_threads() } void -MultiThreadedExecutor::run(size_t this_thread_number) +MultiThreadedExecutor::run( + size_t this_thread_number, + std::function exception_handler) { (void)this_thread_number; - while (rclcpp::ok(this->context_) && spinning.load()) { - rclcpp::AnyExecutable any_exec; + const auto run_inner = [this]() { - std::lock_guard wait_lock{wait_mutex_}; - if (!rclcpp::ok(this->context_) || !spinning.load()) { - return; - } - if (!get_next_executable(any_exec, next_exec_timeout_)) { - continue; + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + { + std::lock_guard wait_lock{wait_mutex_}; + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; + } + if (!get_next_executable(any_exec, next_exec_timeout_)) { + continue; + } + } + if (yield_before_execute_) { + std::this_thread::yield(); + } + + execute_any_executable(any_exec); + + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); } - } - if (yield_before_execute_) { - std::this_thread::yield(); - } + }; - execute_any_executable(any_exec); + if (exception_handler) { + try { + run_inner(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); - // Clear the callback_group to prevent the AnyExecutable destructor from - // resetting the callback group `can_be_taken_from` - any_exec.callback_group.reset(); + exception_handler(e); + } + } else { + run_inner(); } + }