Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Enhanced Thread Pool in Router #310

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions ddsrouter_cmake/cmake/test/test_target.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,24 @@ function(add_test_executable TEST_EXECUTABLE_NAME TEST_SOURCES TEST_NAME TEST_LI

get_win32_path_dependencies(${TEST_EXECUTABLE_NAME} TEST_FRIENDLY_PATH)

foreach(test_name ${TEST_LIST})
add_test(NAME ${TEST_NAME}.${test_name}
COMMAND ${TEST_EXECUTABLE_NAME}
--gtest_filter=${TEST_NAME}.${test_name}:**/${TEST_NAME}.${test_name}/**)

if(TEST_FRIENDLY_PATH)
set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}")
endif(TEST_FRIENDLY_PATH)
endforeach()
if( TEST_LIST )
# If list of tests is not empty, add each test separatly
foreach(test_name ${TEST_LIST})
add_test(NAME ${TEST_NAME}.${test_name}
COMMAND ${TEST_EXECUTABLE_NAME}
--gtest_filter=${TEST_NAME}**.${test_name}:**/${TEST_NAME}**.${test_name}/**)

if(TEST_FRIENDLY_PATH)
set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}")
endif(TEST_FRIENDLY_PATH)
endforeach()
else()
# If no tests are provided, create a single test
message(STATUS "Creating general test ${TEST_NAME}.")
add_test(NAME ${TEST_NAME}
COMMAND ${TEST_EXECUTABLE_NAME})
endif( TEST_LIST )


target_compile_definitions(${TEST_EXECUTABLE_NAME}
PRIVATE FASTDDS_ENFORCE_LOG_INFO
Expand Down
4 changes: 2 additions & 2 deletions ddsrouter_core/src/cpp/communication/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ using namespace eprosima::ddsrouter::core::types;
Bridge::Bridge(
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool)
std::shared_ptr<utils::thread::IManager> thread_manager)
: participants_(participants_database)
, payload_pool_(payload_pool)
, thread_pool_(thread_pool)
, thread_manager_(thread_manager)
, enabled_(false)
{
}
Expand Down
6 changes: 3 additions & 3 deletions ddsrouter_core/src/cpp/communication/Bridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <core/ParticipantsDatabase.hpp>
#include <ddsrouter_core/types/participant/ParticipantId.hpp>
#include <ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp>
#include <ddsrouter_utils/thread/manager/IManager.hpp>

namespace eprosima {
namespace ddsrouter {
Expand Down Expand Up @@ -51,7 +51,7 @@ class Bridge
Bridge(
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool);
std::shared_ptr<utils::thread::IManager> thread_manager);

/**
* Enable bridge
Expand All @@ -72,7 +72,7 @@ class Bridge
std::shared_ptr<PayloadPool> payload_pool_;

//! Common shared thread pool
std::shared_ptr<utils::SlotThreadPool> thread_pool_;
std::shared_ptr<utils::thread::IManager> thread_manager_;

//! Whether the Bridge is currently enabled
std::atomic<bool> enabled_;
Expand Down
6 changes: 3 additions & 3 deletions ddsrouter_core/src/cpp/communication/DDSBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ DDSBridge::DDSBridge(
const RealTopic& topic,
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool,
std::shared_ptr<utils::thread::IManager> thread_manager,
bool enable /* = false */)
: Bridge(participants_database, payload_pool, thread_pool)
: Bridge(participants_database, payload_pool, thread_manager)
, topic_(topic)
{
logDebug(DDSROUTER_DDSBRIDGE, "Creating DDSBridge " << *this << ".");
Expand Down Expand Up @@ -77,7 +77,7 @@ DDSBridge::DDSBridge(
id,
readers_[id], std::move(writers_except_one),
payload_pool_,
thread_pool,
thread_manager,
false);
}

Expand Down
4 changes: 2 additions & 2 deletions ddsrouter_core/src/cpp/communication/DDSBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class DDSBridge : public Bridge
* @param topic: Topic of which this Bridge manages communication
* @param participant_database: Collection of Participants to manage communication
* @param payload_pool: Payload Pool that handles the reservation/release of payloads throughout the DDS Router
* @param thread_pool: Shared pool of threads in charge of data transmission.
* @param thread_manager: Shared pool of threads in charge of data transmission.
* @param enable: Whether the Bridge should be initialized as enabled
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
Expand All @@ -60,7 +60,7 @@ class DDSBridge : public Bridge
const types::RealTopic& topic,
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool,
std::shared_ptr<utils::thread::IManager> thread_manager,
bool enable = false);

/**
Expand Down
15 changes: 4 additions & 11 deletions ddsrouter_core/src/cpp/communication/Track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

#include <ddsrouter_utils/exception/UnsupportedException.hpp>
#include <ddsrouter_utils/Log.hpp>
#include <ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp>
#include <ddsrouter_utils/thread_pool/task/TaskId.hpp>

#include <communication/Track.hpp>

Expand All @@ -38,7 +36,7 @@ Track::Track(
std::shared_ptr<IReader> reader,
std::map<ParticipantId, std::shared_ptr<IWriter>>&& writers,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool,
std::shared_ptr<utils::thread::IManager> thread_manager,
bool enable /* = false */) noexcept
: reader_participant_id_(reader_participant_id)
, topic_(topic)
Expand All @@ -48,19 +46,14 @@ Track::Track(
, enabled_(false)
, exit_(false)
, data_available_status_(DataAvailableStatus::no_more_data)
, thread_pool_(thread_pool)
, transmit_task_id_(utils::new_unique_task_id())
, thread_manager_(thread_manager)
, thread_manager_slot_connector_(thread_manager.get(), std::bind(&Track::transmit_, this))
{
logDebug(DDSROUTER_TRACK, "Creating Track " << *this << ".");

// Set this track to on_data_available lambda call
reader_->set_on_data_available_callback(std::bind(&Track::data_available_, this));

// Set slot in thread pool
thread_pool_->slot(
transmit_task_id_,
std::bind(&Track::transmit_, this));

if (enable)
{
// Activate Track
Expand Down Expand Up @@ -155,7 +148,7 @@ void Track::data_available_() noexcept
{
// no_more_data was set as current status, so no thread was running
// (and will not start as 2 is set as new current status)
thread_pool_->emit(transmit_task_id_);
thread_manager_slot_connector_.execute();
logDebug(DDSROUTER_TRACK, "Track " << *this << " send callback to queue.");
}
}
Expand Down
9 changes: 5 additions & 4 deletions ddsrouter_core/src/cpp/communication/Track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include <participant/IParticipant.hpp>
#include <reader/IReader.hpp>
#include <writer/IWriter.hpp>
#include <ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp>
#include <ddsrouter_utils/thread/manager/IManager.hpp>
#include <ddsrouter_utils/thread/connector/SlotConnector.hpp>

namespace eprosima {
namespace ddsrouter {
Expand Down Expand Up @@ -55,7 +56,7 @@ class Track
std::shared_ptr<IReader> reader,
std::map<types::ParticipantId, std::shared_ptr<IWriter>>&& writers,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool,
std::shared_ptr<utils::thread::IManager> thread_manager,
bool enable = false) noexcept;

/**
Expand Down Expand Up @@ -206,9 +207,9 @@ class Track
*/
std::mutex on_transmission_mutex_;

utils::TaskId transmit_task_id_;
utils::thread::SimpleSlotConnector thread_manager_slot_connector_;

std::shared_ptr<utils::SlotThreadPool> thread_pool_;
std::shared_ptr<utils::thread::IManager> thread_manager_;

static const unsigned int MAX_MESSAGES_TRANSMIT_LOOP_;

Expand Down
42 changes: 27 additions & 15 deletions ddsrouter_core/src/cpp/communication/rpc/RPCBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ RPCBridge::RPCBridge(
const RPCTopic& topic,
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool)
: Bridge(participants_database, payload_pool, thread_pool)
std::shared_ptr<utils::thread::IManager> thread_manager)
: Bridge(participants_database, payload_pool, thread_manager)
, topic_(topic)
, init_(false)
{
Expand Down Expand Up @@ -282,11 +282,18 @@ void RPCBridge::data_available_(

// Protected by internal RTPS Reader mutex, as called within \c onNewCacheChangeAdded callback
// This method is also called from Reader's \c enable_ , so Reader's mutex must also be taken there beforehand
std::pair<bool, utils::TaskId>& task = tasks_map_[reader_guid];
if (!task.first)
auto it = tasks_map_.find(reader_guid);
if (it == tasks_map_.end())
{
task.first = true;
thread_pool_->emit(task.second);
utils::tsnh(STR_ENTRY << "No task for guid: " << reader_guid);
}
auto& task_pair = it->second;

// auto& task_pair = tasks_map_[reader_guid];
if (!task_pair.first)
{
task_pair.first = true;
task_pair.second.execute();
logDebug(DDSROUTER_RPCBRIDGE, "RPCBridge " << *this <<
" - " << reader_guid << " send callback to queue.");
}
Expand Down Expand Up @@ -326,7 +333,12 @@ void RPCBridge::transmit_(
}

// Finish transmission
tasks_map_[reader->guid()].first = false;
auto it = tasks_map_.find(reader->guid());
if (it == tasks_map_.end())
{
utils::tsnh(STR_ENTRY << "No task for guid: " << reader->guid());
}
it->second.first = false;

return;
}
Expand Down Expand Up @@ -470,14 +482,14 @@ void RPCBridge::create_slot_(
});

// Set slot in thread pool for this reader
utils::TaskId task_id = utils::new_unique_task_id();
thread_pool_->slot(
task_id,
[=]()
{
transmit_(reader);
});
tasks_map_[reader_guid] = {false, task_id};
tasks_map_.emplace(
reader_guid,
std::make_pair(
false,
utils::thread::SimpleSlotConnector(
thread_manager_.get(),
[=](){ transmit_(reader); })
));
}

std::ostream& operator <<(
Expand Down
13 changes: 9 additions & 4 deletions ddsrouter_core/src/cpp/communication/rpc/RPCBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
#include <set>
#include <shared_mutex>

#include <communication/Bridge.hpp>
#include <ddsrouter_utils/thread/connector/SlotConnector.hpp>

#include <communication/Bridge.hpp>
#include <communication/rpc/ServiceRegistry.hpp>
#include <ddsrouter_core/types/dds/Guid.hpp>
#include <ddsrouter_core/types/topic/RPCTopic.hpp>
Expand Down Expand Up @@ -60,15 +61,15 @@ class RPCBridge : public Bridge
* @param topic: Topic (service) of which this RPCBridge manages communication
* @param participant_database: Collection of Participants to manage communication
* @param payload_pool: Payload Pool that handles the reservation/release of payloads throughout the DDS Router
* @param thread_pool: Shared pool of threads in charge of data transmission.
* @param thread_manager: Shared pool of threads in charge of data transmission.
*
* @note Always created disabled, manual enable required. First enable creates all endpoints.
*/
RPCBridge(
const types::RPCTopic& topic,
std::shared_ptr<ParticipantsDatabase> participants_database,
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<utils::SlotThreadPool> thread_pool);
std::shared_ptr<utils::thread::IManager> thread_manager);

/**
* @brief Destructor
Expand Down Expand Up @@ -175,7 +176,11 @@ class RPCBridge : public Bridge
std::map<types::ParticipantId, std::shared_ptr<rtps::Writer>> request_writers_;

//! Map readers' GUIDs to their associated thread pool tasks, and also keep a task emission flag.
std::map<types::Guid, std::pair<bool, utils::TaskId>> tasks_map_;
std::map<
types::Guid,
std::pair<
bool,
utils::thread::SimpleSlotConnector>> tasks_map_;

/**
* Registry of requests received, with all the information needed to send the future reply back to the requester.
Expand Down
14 changes: 9 additions & 5 deletions ddsrouter_core/src/cpp/core/DDSRouterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ddsrouter_utils/exception/InitializationException.hpp>
#include <ddsrouter_utils/exception/InconsistencyException.hpp>
#include <ddsrouter_utils/Log.hpp>
#include <ddsrouter_utils/thread/manager/StdThreadPool.hpp>

#include <ddsrouter_core/configuration/DDSRouterConfiguration.hpp>

Expand All @@ -45,10 +46,13 @@ DDSRouterImpl::DDSRouterImpl(
, discovery_database_(new DiscoveryDatabase())
, configuration_(configuration)
, enabled_(false)
, thread_pool_(std::make_shared<utils::SlotThreadPool>(configuration_.number_of_threads))
{
logDebug(DDSROUTER, "Creating DDS Router.");

// NOTE: setting it here to avoid order issue in variables construction
thread_pool_ = new utils::thread::StdThreadPool(configuration_.number_of_threads, false);
thread_manager_ = std::shared_ptr<utils::thread::IManager>(thread_pool_);

// Check that the configuration is correct
utils::Formatter error_msg;
if (!configuration_.is_valid(error_msg))
Expand Down Expand Up @@ -256,7 +260,7 @@ utils::ReturnCode DDSRouterImpl::start_() noexcept
logInfo(DDSROUTER, "Starting DDS Router.");

// Enable thread pool
thread_pool_->enable();
thread_pool_->start();

activate_all_topics_();

Expand Down Expand Up @@ -290,7 +294,7 @@ utils::ReturnCode DDSRouterImpl::stop_() noexcept
logInfo(DDSROUTER, "Stopping DDS Router.");

// Disable thread pool so tasks running finish and new tasks are not taken by threads
thread_pool_->disable();
thread_pool_->stop();

deactivate_all_topics_();
return utils::ReturnCode::RETCODE_OK;
Expand Down Expand Up @@ -491,7 +495,7 @@ void DDSRouterImpl::create_new_bridge(

try
{
bridges_[topic] = std::make_unique<DDSBridge>(topic, participants_database_, payload_pool_, thread_pool_,
bridges_[topic] = std::make_unique<DDSBridge>(topic, participants_database_, payload_pool_, thread_manager_,
enabled);
}
catch (const utils::InitializationException& e)
Expand All @@ -510,7 +514,7 @@ void DDSRouterImpl::create_new_service(
logInfo(DDSROUTER, "Creating Service: " << topic << ".");

// Endpoints not created until enabled for the first time, so no exception can be thrown
rpc_bridges_[topic] = std::make_unique<RPCBridge>(topic, participants_database_, payload_pool_, thread_pool_);
rpc_bridges_[topic] = std::make_unique<RPCBridge>(topic, participants_database_, payload_pool_, thread_manager_);
}

void DDSRouterImpl::activate_topic_(
Expand Down
17 changes: 15 additions & 2 deletions ddsrouter_core/src/cpp/core/DDSRouterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include <mutex>

#include <ddsrouter_utils/ReturnCode.hpp>
#include <ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp>
#include <ddsrouter_utils/thread/manager/IManager.hpp>
#include <ddsrouter_utils/thread/manager/StdThreadPool.hpp>

#include <communication/DDSBridge.hpp>
#include <communication/rpc/RPCBridge.hpp>
Expand Down Expand Up @@ -342,7 +343,19 @@ class DDSRouterImpl
//! Internal mutex for concurrent calls
std::recursive_mutex mutex_;

std::shared_ptr<utils::SlotThreadPool> thread_pool_;
/**
* @warning \c thread_manager_ and \c thread_pool_ reference the same object.
*
* @brief Couple of variables that reference the same object StdThreadPool.
*
* While \c thread_pool_ is used internally to start and stop Thread Pool (and methods specific from ThreadPool
* and not from IManager) \c thread_manager_ is used to pass it to other entities as a shared entity that need
* to use a IManager.
*
* The use of both variables is just to avoid the casting when using StdThreadPool specific methods.
*/
utils::thread::StdThreadPool* thread_pool_;
std::shared_ptr<utils::thread::IManager> thread_manager_;
};

} /* namespace core */
Expand Down
Loading