Skip to content

Commit

Permalink
Utilize rclcpp::WaitSet as part of the executors
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <[email protected]>
  • Loading branch information
mjcarroll committed Apr 3, 2023
1 parent 6267741 commit 974e845
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 912 deletions.
2 changes: 0 additions & 2 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_publisher.cpp
Expand Down
169 changes: 15 additions & 154 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cassert>
#include <chrono>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <map>
Expand All @@ -29,26 +30,24 @@

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set.hpp"

namespace rclcpp
{

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;

// Forward declaration is used in convenience method signature.
class Node;

Expand Down Expand Up @@ -402,17 +401,6 @@ class Executor
void
cancel();

/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory_strategy is null
*/
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);

/// Returns true if the executor is currently spinning.
/**
* This function can be called asynchronously from any thread.
Expand Down Expand Up @@ -497,6 +485,10 @@ class Executor
static void
execute_client(rclcpp::ClientBase::SharedPtr client);

RCLCPP_PUBLIC
void
collect_entities();

/// Block until more work becomes avilable or timeout is reached.
/**
* Builds a set of waitable entities, which are passed to the middleware.
Expand All @@ -508,62 +500,6 @@ class Executor
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Find node associated with a callback group
/**
* \param[in] weak_groups_to_nodes map of callback groups to nodes
* \param[in] group callback group to find assocatiated node
* \return Pointer to associated node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return true if the node is associated with the executor, otherwise false
*/
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

/// Find the callback group associated with a timer
/**
* \param[in] timer Timer to find associated callback group
* \return Pointer to callback group node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);

/// Add a callback group to an executor
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Check for executable in ready state and populate union structure.
/**
* \param[out] any_executable populated union structure of ready executable
Expand All @@ -574,33 +510,6 @@ class Executor
bool
get_next_ready_executable(AnyExecutable & any_executable);

/// Check for executable in ready state and populate union structure.
/**
* This is the implementation of get_next_ready_executable that takes into
* account the current state of callback groups' association with nodes and
* executors.
*
* This checks in a particular order for available work:
* * Timers
* * Subscriptions
* * Services
* * Clients
* * Waitable
*
* If the next executable is not associated with this executor/node pair,
* then this method will return false.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

/// Wait for executable in ready state and populate union structure.
/**
* If an executable is ready, it will return immediately, otherwise
Expand All @@ -618,21 +527,6 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Add all callback groups that can be automatically added from associated nodes.
/**
* The executor, before collecting entities, verifies if any callback group from
* nodes associated with the executor, which is not already associated to an executor,
* can be automatically added to this executor.
* This takes care of any callback group that has been added to a node but not explicitly added
* to the executor.
* It is important to note that in order for the callback groups to be automatically added to an
* executor through this function, the node of the callback groups needs to have been added
* through the `add_node` method.
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand All @@ -642,16 +536,8 @@ class Executor
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;

Expand All @@ -661,39 +547,14 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
rclcpp::executors::ExecutorEntitiesCollector collector_;

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
rclcpp::executors::ExecutorEntitiesCollection current_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_;

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::shared_ptr<rclcpp::WaitSet> wait_set_;
std::deque<rclcpp::AnyExecutable> ready_executables_;

/// shutdown callback handle registered to Context
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
Expand Down
1 change: 0 additions & 1 deletion rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include "rclcpp/executors/multi_threaded_executor.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP__WAIT_SET_POLICIES__DETAIL__STORAGE_POLICY_COMMON_HPP_
#define RCLCPP__WAIT_SET_POLICIES__DETAIL__STORAGE_POLICY_COMMON_HPP_

#include <iostream>
#include <memory>
#include <stdexcept>
#include <utility>
Expand Down Expand Up @@ -79,13 +80,16 @@ class StoragePolicyCommon
throw std::runtime_error("unexpected condition, fixed storage policy needs pruning");
}
// Flag for pruning.
std::cout << "waitable needs pruning" << std::endl;
needs_pruning_ = true;
continue;
}

rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
subscriptions_from_waitables += waitable.get_number_of_ready_subscriptions();
guard_conditions_from_waitables += waitable.get_number_of_ready_guard_conditions();
std::cout << "guard_conditions_from_waitables: " << guard_conditions_from_waitables <<
std::endl;
timers_from_waitables += waitable.get_number_of_ready_timers();
clients_from_waitables += waitable.get_number_of_ready_clients();
services_from_waitables += waitable.get_number_of_ready_services();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon
// storage wait sets since they cannot be changed after construction.
// This will also clear the wait set and re-add all the entities, which
// prepares it to be waited on again.
std::cout << "rebuild_rcl_wait_set" << std::endl;
rebuild_rcl_wait_set();
}

std::cout << "get_rcl_wait_set" << std::endl;
rcl_wait_set_t & rcl_wait_set = get_rcl_wait_set();

// Wait unconditionally until timeout condition occurs since we assume
Expand All @@ -297,6 +299,7 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon
// It is ok to wait while not having the lock acquired, because the state
// in the rcl wait set will not be updated until this method calls
// rebuild_rcl_wait_set().
std::cout << "rcl_wait" << std::endl;
rcl_ret_t ret = rcl_wait(&rcl_wait_set, time_left_to_wait_ns.count());
if (RCL_RET_OK == ret) {
// Something has become ready in the wait set, first check if it was
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/wait_set_template.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
this->storage_acquire_ownerships();
RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});


// this method comes from the SynchronizationPolicy
return this->template sync_wait<WaitResult<WaitSetTemplate>>(
// pass along the time_to_wait duration as nanoseconds
Expand Down
Loading

0 comments on commit 974e845

Please sign in to comment.