diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 7899a35233..279c642c0e 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -152,10 +152,6 @@ class CallbackGroup bool has_valid_node(); - RCLCPP_PUBLIC - bool - has_valid_node(); - RCLCPP_PUBLIC std::atomic_bool & can_be_taken_from(); diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3a4af267ec..fcbdfe8d7b 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -30,6 +30,7 @@ #include "rcl/guard_condition.h" #include "rcl/wait.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/context.hpp" @@ -484,6 +485,10 @@ class Executor static void execute_client(rclcpp::ClientBase::SharedPtr client); + RCLCPP_PUBLIC + void + collect_entities(); + /// Block until more work becomes avilable or timeout is reached. /** * Builds a set of waitable entities, which are passed to the middleware. @@ -541,6 +546,7 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); + std::shared_ptr notify_waitable_; rclcpp::executors::ExecutorEntitiesCollector collector_; rclcpp::executors::ExecutorEntitiesCollection current_collection_; diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index b3f41f8ed5..c139913be8 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -661,6 +661,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_acquire_ownerships(); RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();}); + // this method comes from the SynchronizationPolicy return this->template sync_wait>( // pass along the time_to_wait duration as nanoseconds diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 23d5695c89..e6001fa8e8 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -23,6 +23,7 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" #include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" @@ -46,6 +47,11 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(options.context), shutdown_guard_condition_(std::make_shared(options.context)), + notify_waitable_(std::make_shared( + [this]() { + this->collect_entities(); + })), + collector_(notify_waitable_), wait_set_(std::make_shared()) { // Store the context for later use. @@ -59,39 +65,45 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - this->collector_.get_executor_notify_waitable()->add_guard_condition( - &interrupt_guard_condition_); - this->collector_.get_executor_notify_waitable()->add_guard_condition( - shutdown_guard_condition_.get()); + notify_waitable_->add_guard_condition(&interrupt_guard_condition_); + notify_waitable_->add_guard_condition(shutdown_guard_condition_.get()); } Executor::~Executor() { - current_collection_.update_timers({}, - [this](auto timer){wait_set_->add_timer(timer);}, - [this](auto timer){wait_set_->remove_timer(timer);}); - - current_collection_.update_subscriptions({}, - [this](auto subscription){ - wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);}, - [this](auto subscription){ - wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);}); + current_collection_.timers.update( + {}, + [this](auto timer) {wait_set_->add_timer(timer);}, + [this](auto timer) {wait_set_->remove_timer(timer);}); + + current_collection_.subscriptions.update( + {}, + [this](auto subscription) { + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask); + }, + [this](auto subscription) { + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask); + }); - current_collection_.update_clients({}, - [this](auto client){wait_set_->add_client(client);}, - [this](auto client){wait_set_->remove_client(client);}); + current_collection_.clients.update( + {}, + [this](auto client) {wait_set_->add_client(client);}, + [this](auto client) {wait_set_->remove_client(client);}); - current_collection_.update_services({}, - [this](auto service){wait_set_->add_service(service);}, - [this](auto service){wait_set_->remove_service(service);}); + current_collection_.services.update( + {}, + [this](auto service) {wait_set_->add_service(service);}, + [this](auto service) {wait_set_->remove_service(service);}); - current_collection_.update_guard_conditions({}, - [this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);}, - [this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);}); + current_collection_.guard_conditions.update( + {}, + [this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);}); - current_collection_.update_waitables({}, - [this](auto waitable){wait_set_->add_waitable(waitable);}, - [this](auto waitable){wait_set_->remove_waitable(waitable);}); + current_collection_.waitables.update( + {}, + [this](auto waitable) {wait_set_->add_waitable(waitable);}, + [this](auto waitable) {wait_set_->remove_waitable(waitable);}); // Remove shutdown callback handle registered to Context if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { @@ -129,15 +141,14 @@ Executor::add_callback_group( (void) node_ptr; this->collector_.add_callback_group(group_ptr); - if (notify) - { + if (notify) { // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } } @@ -146,15 +157,14 @@ void Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { this->collector_.add_node(node_ptr); - if (notify) - { + if (notify) { // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); } } } @@ -166,15 +176,14 @@ Executor::remove_callback_group( { this->collector_.remove_callback_group(group_ptr); - if (notify) - { + if (notify) { // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); } } } @@ -183,18 +192,6 @@ void Executor::add_node(std::shared_ptr node_ptr, bool notify) { this->add_node(node_ptr->get_node_base_interface(), notify); - - if (notify) - { - // Interrupt waiting to handle removed callback group - try { - interrupt_guard_condition_.trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on node add: ") + ex.what()); - } - } } void @@ -202,15 +199,14 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node { this->collector_.remove_node(node_ptr); - if (notify) - { + if (notify) { // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } } @@ -356,16 +352,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec) if (any_exec.waitable) { any_exec.waitable->execute(any_exec.data); } + // Reset the callback_group, regardless of type - any_exec.callback_group->can_be_taken_from().store(true); - // Wake the wait, because it may need to be recalculated or work that - // was previously blocked is now available. - try { - interrupt_guard_condition_.trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + if (any_exec.callback_group) { + any_exec.callback_group->can_be_taken_from().store(true); } } @@ -506,47 +496,65 @@ Executor::execute_client( } void -Executor::wait_for_work(std::chrono::nanoseconds timeout) +Executor::collect_entities() { - TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); - { - std::lock_guard guard(mutex_); - - rclcpp::executors::ExecutorEntitiesCollection collection; - auto callback_groups = this->collector_.get_all_callback_groups(); - rclcpp::executors::build_entities_collection(callback_groups, collection); - - current_collection_.update_timers(collection.timers, - [this](auto timer){wait_set_->add_timer(timer);}, - [this](auto timer){wait_set_->remove_timer(timer);}); - - current_collection_.update_subscriptions(collection.subscriptions, - [this](auto subscription){ - wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);}, - [this](auto subscription){ - wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);}); - - current_collection_.update_clients(collection.clients, - [this](auto client){wait_set_->add_client(client);}, - [this](auto client){wait_set_->remove_client(client);}); + std::lock_guard guard(mutex_); + + rclcpp::executors::ExecutorEntitiesCollection collection; + this->collector_.update_collections(); + auto callback_groups = this->collector_.get_all_callback_groups(); + rclcpp::executors::build_entities_collection(callback_groups, collection); + + auto notify_waitable = std::static_pointer_cast(notify_waitable_); + collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}}); + + current_collection_.timers.update( + collection.timers, + [this](auto timer) {wait_set_->add_timer(timer);}, + [this](auto timer) {wait_set_->remove_timer(timer);}); + + current_collection_.subscriptions.update( + collection.subscriptions, + [this](auto subscription) { + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask); + }, + [this](auto subscription) { + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask); + }); - current_collection_.update_services(collection.services, - [this](auto service){wait_set_->add_service(service);}, - [this](auto service){wait_set_->remove_service(service);}); + current_collection_.clients.update( + collection.clients, + [this](auto client) {wait_set_->add_client(client);}, + [this](auto client) {wait_set_->remove_client(client);}); + + current_collection_.services.update( + collection.services, + [this](auto service) {wait_set_->add_service(service);}, + [this](auto service) {wait_set_->remove_service(service);}); + + current_collection_.guard_conditions.update( + collection.guard_conditions, + [this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);}); + + current_collection_.waitables.update( + collection.waitables, + [this](auto waitable) {wait_set_->add_waitable(waitable);}, + [this](auto waitable) {wait_set_->remove_waitable(waitable);}); +} - current_collection_.update_guard_conditions(collection.guard_conditions, - [this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);}, - [this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);}); +void +Executor::wait_for_work(std::chrono::nanoseconds timeout) +{ + TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); - current_collection_.update_waitables(collection.waitables, - [this](auto waitable){wait_set_->add_waitable(waitable);}, - [this](auto waitable){wait_set_->remove_waitable(waitable);}); + if (current_collection_.empty()) { + this->collect_entities(); } auto wait_result = wait_set_->wait(timeout); - if (wait_result.kind() == WaitResultKind::Empty) - { + if (wait_result.kind() == WaitResultKind::Empty) { RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in wait(). This should never happen."); @@ -562,8 +570,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) TRACEPOINT(rclcpp_executor_get_next_ready); std::lock_guard guard{mutex_}; - if (ready_executables_.size() == 0) - { + if (ready_executables_.size() == 0) { return false; } diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 2dce369b08..a10e8d694f 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -121,6 +121,7 @@ void check_ready( exec.callback_group = callback_group; if (fill_executable(exec, entity)) { executables.push_back(exec); + } else { } } } diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 78e2db91a2..2779a9cd30 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -48,6 +48,7 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) bool ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) { + std::cout << "ExecutorNotifyWaitable::is_ready" << std::endl; std::lock_guard lock(guard_condition_mutex_); for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) { auto rcl_guard_condition = wait_set->guard_conditions[ii]; @@ -68,6 +69,7 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) void ExecutorNotifyWaitable::execute(std::shared_ptr & data) { + std::cout << "ExecutorNotifyWaitable::execute" << std::endl; (void) data; this->execute_callback_(); } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index dd26a88773..b822e6e24f 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -680,15 +680,6 @@ if(TARGET test_entities_collector) target_link_libraries(test_entities_collector ${PROJECT_NAME} mimick) endif() -ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp - APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) -if(TARGET test_entities_collector) - ament_target_dependencies(test_entities_collector - "rcl" - "test_msgs") - target_link_libraries(test_entities_collector ${PROJECT_NAME} mimick) -endif() - ament_add_gtest(test_executor_notify_waitable executors/test_executor_notify_waitable.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) if(TARGET test_executor_notify_waitable) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 54fb9af6fc..1fee748b78 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -151,7 +151,7 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) { } // Sleep for a short time to verify executor.spin() is going, and didn't throw. - std::thread spinner([&]() {EXPECT_NO_THROW(executor.spin());}); + std::thread spinner([&]() {executor.spin();}); std::this_thread::sleep_for(50ms); executor.cancel();