Skip to content

Commit

Permalink
feat(MultiThreadedExecutor): Added ability to handle exceptions from …
Browse files Browse the repository at this point in the history
…threads

This commit adds external exception handling for the worker threads,
allowing application code to implement custom exception handling.

feat(Executor): Added spin API with exception handler.


Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Apr 12, 2024
1 parent dec22a2 commit 8611bbf
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 16 deletions.
24 changes: 24 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ class Executor
virtual void
spin() = 0;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;

/// Add a callback group to an executor.
/**
* An executor can have zero or more callback groups which provide work during `spin` functions.
Expand Down Expand Up @@ -470,6 +482,18 @@ class Executor
void
execute_any_executable(AnyExecutable & any_exec);

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
* service, client).
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler);

/// Run subscription executable.
/**
* Do necessary setup and tear-down as well as executing the subscription.
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,28 @@ class MultiThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

RCLCPP_PUBLIC
size_t
get_number_of_threads();

protected:
RCLCPP_PUBLIC
void
run(size_t this_thread_number);
run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler);

private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
Expand Down
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/// Single-threaded implementation of spin.
/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Static executor implementation of spin some
/**
* This non-blocking function will execute entities that
Expand Down Expand Up @@ -125,6 +135,11 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin_once_impl(std::chrono::nanoseconds timeout) override;

void
spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler);

std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
collect_and_wait(std::chrono::nanoseconds timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Events executor implementation of spin some
/**
* This non-blocking function will execute the timers and events
Expand Down
14 changes: 12 additions & 2 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ class TimersManager
/**
* @brief Starts a thread that takes care of executing the timers stored in this object.
* Function will throw an error if the timers thread was already running.
*
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
* and any occurring exception will be passed to the given handler
*/
RCLCPP_PUBLIC
void start();
void start(
const std::function<void(const std::exception & e)> & exception_handler = std::function<void(
const std::exception & e)>());

/**
* @brief Stops the timers thread.
Expand Down Expand Up @@ -511,6 +516,11 @@ class TimersManager
*/
void run_timers();

/**
* @brief calls run_timers with a try catch block.
*/
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);

/**
* @brief Get the amount of time before the next timer triggers.
* This function is not thread safe, acquire a mutex before calling it.
Expand All @@ -528,7 +538,7 @@ class TimersManager
* while keeping the heap correctly sorted.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
void execute_ready_timers_unsafe();
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *,
Expand Down
70 changes: 70 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,76 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}
}

template<typename Function>
void execute_guarded(
const Function & fun,
const std::function<void(const std::exception & e)> & exception_handler)
{
try {
fun();
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
}

void
Executor::execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (!spinning.load()) {
return;
}

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_guarded([&any_exec]() {
execute_timer(any_exec.timer, any_exec.data);
}, exception_handler);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_guarded(
[&any_exec]() {
execute_subscription(any_exec.subscription);
}, exception_handler);
}
if (any_exec.service) {
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
}
if (any_exec.client) {
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
}
if (any_exec.waitable) {
execute_guarded([&any_exec]() {
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}, exception_handler);
}
// Reset the callback_group, regardless of type
if(any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
}
}


template<typename Taker, typename Handler>
static
void
Expand Down
17 changes: 13 additions & 4 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}

void
MultiThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}

void

MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
threads.emplace_back(func);
}
}

run(thread_id);
run(thread_id, exception_handler);
for (auto & thread : threads) {
thread.join();
}
Expand All @@ -79,7 +86,9 @@ MultiThreadedExecutor::get_number_of_threads()
}

void
MultiThreadedExecutor::run(size_t this_thread_number)
MultiThreadedExecutor::run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler)
{
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
Expand All @@ -97,7 +106,7 @@ MultiThreadedExecutor::run(size_t this_thread_number)
std::this_thread::yield();
}

execute_any_executable(any_exec);
execute_any_executable(any_exec, exception_handler);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
Expand Down
22 changes: 22 additions & 0 deletions rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,25 @@ SingleThreadedExecutor::spin()
}
}
}


void
SingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;

while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable, exception_handler);
}
}
}
40 changes: 39 additions & 1 deletion rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ StaticSingleThreadedExecutor::spin()
}
}


void
StaticSingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// This is essentially the contents of the rclcpp::Executor::wait_for_work method,
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
// behavior.
while (rclcpp::ok(this->context_) && spinning.load()) {
this->spin_once_impl(std::chrono::nanoseconds(-1), exception_handler);
}
}

void
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
{
Expand Down Expand Up @@ -97,12 +115,32 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati

void
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
{
spin_once_impl(timeout, std::function<void(const std::exception & e)>());
}

void
StaticSingleThreadedExecutor::spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (rclcpp::ok(context_) && spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
auto wait_result = this->collect_and_wait(timeout);
if (wait_result.has_value()) {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
if(exception_handler) {
try {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
} else {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
}
}
}
}
Expand Down
Loading

0 comments on commit 8611bbf

Please sign in to comment.