Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create common structures for executors to use #2143

Merged
merged 25 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2bf88de
Deprecate callback_group call taking context
mjcarroll Mar 29, 2023
9099635
Add base executor objects that can be used by implementors
mjcarroll Mar 29, 2023
2426056
Template common operations
mjcarroll Mar 29, 2023
173ffd6
Address reviewer feedback:
mjcarroll Mar 29, 2023
a524bf0
Lint
mjcarroll Mar 29, 2023
89f2106
Address reviewer feedback and fix templates
mjcarroll Mar 30, 2023
9695eaa
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Mar 30, 2023
e173e5a
Lint and docs
mjcarroll Mar 30, 2023
653d1a3
Make executor own the notify waitable
mjcarroll Mar 31, 2023
a6c4c1b
Add pending queue to collector, remove from waitable
mjcarroll Apr 3, 2023
9dd48ce
Change interrupt guard condition to shared_ptr
mjcarroll Apr 3, 2023
6267741
Lint and docs
mjcarroll Apr 3, 2023
1b1a915
Don't exchange atomic twice
mjcarroll Apr 3, 2023
0a9c9a6
Fix add_node and add more tests
mjcarroll Apr 3, 2023
0ae0bea
Make get_notify_guard_condition follow API tick-tock
mjcarroll Apr 3, 2023
87f41bf
Improve callback group tick-tocking
mjcarroll Apr 3, 2023
5809328
Don't lock twice
mjcarroll Apr 3, 2023
debe396
Address reviewer feedback
mjcarroll Apr 4, 2023
c4b6589
Add thread safety annotations and make locks consistent
mjcarroll Apr 4, 2023
cd7aaba
Address reviewer feedback
mjcarroll Apr 11, 2023
6379f0c
Remove the "add_valid_node" API
mjcarroll Apr 12, 2023
855c64d
Only notify if the trigger condition is valid
mjcarroll Apr 12, 2023
d9a9206
Only trigger if valid and needed
mjcarroll Apr 13, 2023
838d1ae
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Apr 13, 2023
ab3bbf4
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Apr 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executable_list.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/executor_entities_collector.cpp
src/rclcpp/executors/executor_entities_collection.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
Expand Down
32 changes: 31 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@ class CallbackGroup
* added to the executor in either case.
*
* \param[in] group_type The type of the callback group.
* \param[in] get_node_context Lambda to retrieve the node context when
* checking that the creating node is valid and using the guard condition.
* \param[in] automatically_add_to_executor_with_node A boolean that
* determines whether a callback group is automatically added to an executor
* with the node with which it is associated.
*/
RCLCPP_PUBLIC
explicit CallbackGroup(
CallbackGroupType group_type,
std::function<rclcpp::Context::SharedPtr(void)> get_node_context,
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
bool automatically_add_to_executor_with_node = true);

/// Default destructor.
Expand Down Expand Up @@ -137,6 +140,18 @@ class CallbackGroup
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}

/// Return if the node that created this callback group still exists
/**
* As nodes can share ownership of callback groups with an executor, this
* may be used to ensures that the executor doesn't operate on a callback
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
* group that has outlived it's creating node.
*
* \return true if the creating node still exists, otherwise false
*/
RCLCPP_PUBLIC
bool
has_valid_node();
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
Expand Down Expand Up @@ -178,11 +193,24 @@ class CallbackGroup
bool
automatically_add_to_executor_with_node() const;

/// Defer creating the notify guard condition and return it.
/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \param[in] context_ptr context to use when creating the guard condition
* \return guard condition if it is valid, otherwise nullptr.
*/
[[deprecated("Use get_notify_guard_condition() without arguments")]]
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr);

/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \return guard condition if it is valid, otherwise nullptr.
*/
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition();

/// Trigger the notify guard condition.
RCLCPP_PUBLIC
void
Expand Down Expand Up @@ -234,6 +262,8 @@ class CallbackGroup
std::shared_ptr<rclcpp::GuardCondition> notify_guard_condition_ = nullptr;
std::recursive_mutex notify_guard_condition_mutex_;

std::function<rclcpp::Context::SharedPtr(void)> get_context_;

private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Expand Down
137 changes: 137 additions & 0 deletions rclcpp/include/rclcpp/executors/executor_entities_collection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_
#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_

#include <deque>
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
#include <unordered_map>
#include <vector>

#include "rcpputils/thread_safety_annotations.hpp"

#include <rclcpp/any_executable.hpp>
#include <rclcpp/node_interfaces/node_base.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/executors/executor_notify_waitable.hpp>
#include <rclcpp/visibility_control.hpp>
#include <rclcpp/wait_result.hpp>
#include <rclcpp/wait_set.hpp>

namespace rclcpp
{
namespace executors
{

struct ExecutorEntitiesCollection
{
struct TimerEntry
{
rclcpp::TimerBase::WeakPtr timer;
rclcpp::CallbackGroup::WeakPtr callback_group;
};
using TimerCollection = std::unordered_map<const rcl_timer_t *, TimerEntry>;

struct SubscriptionEntry
{
rclcpp::SubscriptionBase::WeakPtr subscription;
rclcpp::CallbackGroup::WeakPtr callback_group;
};
using SubscriptionCollection = std::unordered_map<const rcl_subscription_t *, SubscriptionEntry>;

struct ClientEntry
{
rclcpp::ClientBase::WeakPtr client;
rclcpp::CallbackGroup::WeakPtr callback_group;
};
using ClientCollection = std::unordered_map<const rcl_client_t *, ClientEntry>;

struct ServiceEntry
{
rclcpp::ServiceBase::WeakPtr service;
rclcpp::CallbackGroup::WeakPtr callback_group;
};
using ServiceCollection = std::unordered_map<const rcl_service_t *, ServiceEntry>;

struct WaitableEntry
{
rclcpp::Waitable::WeakPtr waitable;
rclcpp::CallbackGroup::WeakPtr callback_group;
};
using WaitableCollection = std::unordered_map<const rclcpp::Waitable *, WaitableEntry>;

using GuardConditionCollection = std::unordered_map<const rcl_guard_condition_t *,
rclcpp::GuardCondition::WeakPtr>;

TimerCollection timers;
SubscriptionCollection subscriptions;
ClientCollection clients;
ServiceCollection services;
GuardConditionCollection guard_conditions;
WaitableCollection waitables;

void clear();

using TimerUpdatedFunc = std::function<void (rclcpp::TimerBase::SharedPtr)>;
void update_timers(
const ExecutorEntitiesCollection::TimerCollection & other,
TimerUpdatedFunc timer_added,
TimerUpdatedFunc timer_removed);

using SubscriptionUpdatedFunc = std::function<void (rclcpp::SubscriptionBase::SharedPtr)>;
void update_subscriptions(
const ExecutorEntitiesCollection::SubscriptionCollection & other,
SubscriptionUpdatedFunc subscription_added,
SubscriptionUpdatedFunc subscription_removed);

using ClientUpdatedFunc = std::function<void (rclcpp::ClientBase::SharedPtr)>;
void update_clients(
const ExecutorEntitiesCollection::ClientCollection & other,
ClientUpdatedFunc client_added,
ClientUpdatedFunc client_removed);

using ServiceUpdatedFunc = std::function<void (rclcpp::ServiceBase::SharedPtr)>;
void update_services(
const ExecutorEntitiesCollection::ServiceCollection & other,
ServiceUpdatedFunc service_added,
ServiceUpdatedFunc service_removed);

using GuardConditionUpdatedFunc = std::function<void (rclcpp::GuardCondition::SharedPtr)>;
void update_guard_conditions(
const ExecutorEntitiesCollection::GuardConditionCollection & other,
GuardConditionUpdatedFunc guard_condition_added,
GuardConditionUpdatedFunc guard_condition_removed);

using WaitableUpdatedFunc = std::function<void (rclcpp::Waitable::SharedPtr)>;
void update_waitables(
const ExecutorEntitiesCollection::WaitableCollection & other,
WaitableUpdatedFunc waitable_added,
WaitableUpdatedFunc waitable_removed);
};

void
build_entities_collection(
const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
ExecutorEntitiesCollection & collection);

std::deque<rclcpp::AnyExecutable>
ready_executables(
const ExecutorEntitiesCollection & collection,
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result
);

} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_
151 changes: 151 additions & 0 deletions rclcpp/include/rclcpp/executors/executor_entities_collector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_
#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_

#include <list>
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
#include <memory>
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
#include <set>
#include <vector>

#include "rcpputils/thread_safety_annotations.hpp"

#include <rclcpp/any_executable.hpp>
#include <rclcpp/node_interfaces/node_base.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/executors/executor_notify_waitable.hpp>
#include <rclcpp/visibility_control.hpp>
#include <rclcpp/wait_set.hpp>
#include <rclcpp/wait_result.hpp>

namespace rclcpp
{
namespace executors
{

class ExecutorEntitiesCollector
{
public:
RCLCPP_PUBLIC
ExecutorEntitiesCollector();

RCLCPP_PUBLIC
~ExecutorEntitiesCollector();

/// Add a node to the entity collector
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \throw std::runtime_error if the node is associated with an executor
*/
RCLCPP_PUBLIC
void
add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

/// Remove a node from the entity collector
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \throw std::runtime_error if the node is associated with an executor
* \throw std::runtime_error if the node is associated with this executor
*/
RCLCPP_PUBLIC
void
remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

RCLCPP_PUBLIC
bool
has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

/// Add a callback group to the entity collector
/**
* \param[in] group_ptr a shared pointer that points to a callback group
* \throw std::runtime_error if the callback_group is associated with an executor
*/
RCLCPP_PUBLIC
void
add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr);

/// Remove a callback group from the entity collector
/**
* \param[in] group_ptr a shared pointer that points to a callback group
* \throw std::runtime_error if the callback_group is not associated with an executor
* \throw std::runtime_error if the callback_group is not associated with this executor
*/
RCLCPP_PUBLIC
void
remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr);

RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups();
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups();
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups();
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
void
update_collections();

RCLCPP_PUBLIC
std::shared_ptr<ExecutorNotifyWaitable>
get_executor_notify_waitable();

protected:
using CallbackGroupCollection = std::set<
rclcpp::CallbackGroup::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>;

RCLCPP_PUBLIC
void
add_callback_group_to_collection(
rclcpp::CallbackGroup::SharedPtr group_ptr,
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
void
remove_callback_group_from_collection(
rclcpp::CallbackGroup::SharedPtr group_ptr,
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
void
add_automatically_associated_callback_groups() RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
void
prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_);

mutable std::mutex mutex_;

CallbackGroupCollection
manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

CallbackGroupCollection
automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

std::shared_ptr<ExecutorNotifyWaitable> notify_waitable_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_
Loading