Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Simplify the DdsPipe's constructor
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <danieldiaz@eprosima.com>
Tempate committed Oct 13, 2023
1 parent 6ca15b5 commit 871c871
Showing 7 changed files with 184 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -65,6 +65,13 @@ struct DdsPipeConfiguration : public IConfiguration
utils::Formatter& error_msg,
const std::map<types::ParticipantId, bool>& participants) const noexcept;

/**
* @brief Set internal values with the values reloaded.
*/
DDSPIPE_CORE_DllAPI
void reload(
const DdsPipeConfiguration& new_configuration);

/**
* @brief Select the \c RoutesConfiguration for a topic.
*
@@ -78,6 +85,13 @@ struct DdsPipeConfiguration : public IConfiguration
// VARIABLES
/////////////////////////

//! Topic lists to build the AllowedTopics
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> allowlist{};
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> blocklist{};

//! Builtin topics to create at the beggining of the execution
std::set<utils::Heritable<ddspipe::core::types::DistributedTopic>> builtin_topics{};

//! Set of fixed topics' QoS
std::vector<utils::Heritable<ddspipe::core::types::WildcardDdsFilterTopic>> manual_topics {};

@@ -89,6 +103,9 @@ struct DdsPipeConfiguration : public IConfiguration

//! Whether entities should be removed when they have no writers connected to them.
bool remove_unused_entities = false;

//! Whether the DDS Pipe should be initialized enabled.
bool init_enabled = false;
};

} /* namespace core */
32 changes: 26 additions & 6 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
@@ -62,13 +62,10 @@ class DdsPipe
DDSPIPE_CORE_DllAPI
DdsPipe(
const DdsPipeConfiguration& configuration,
const std::shared_ptr<AllowedTopicList>& allowed_topics,
const std::shared_ptr<DiscoveryDatabase>& discovery_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const std::set<utils::Heritable<types::DistributedTopic>>& builtin_topics = {},
bool start_enable = false);
const std::shared_ptr<utils::SlotThreadPool>& thread_pool);

/**
* @brief Destroy the DdsPipe object
@@ -96,8 +93,8 @@ class DdsPipe
* @throw \c ConfigurationException in case the new yaml is not well-formed
*/
DDSPIPE_CORE_DllAPI
utils::ReturnCode reload_allowed_topics(
const std::shared_ptr<AllowedTopicList>& allowed_topics);
utils::ReturnCode reload_configuration(
const DdsPipeConfiguration& new_configuration);

/////////////////////////
// ENABLING METHODS
@@ -129,6 +126,29 @@ class DdsPipe

protected:

/////////////////////////
// INTERACTION METHODS
/////////////////////////
/**
* @brief Load allowed topics from configuration
*
* @throw \c ConfigurationException in case the yaml inside allowlist is not well-formed
*/
void init_allowed_topics_();

/**
* @brief Reload the allowed topic configuration
*
* @param [in] configuration : new configuration
*
* @return \c RETCODE_OK if configuration has been updated correctly
* @return \c RETCODE_NO_DATA if new configuration has not changed
* @return \c RETCODE_ERROR if any other error has occurred
*
* @throw \c ConfigurationException in case the new yaml is not well-formed
*/
utils::ReturnCode reload_allowed_topics_(const std::shared_ptr<AllowedTopicList>& allowed_topics);

/////////////////////////
// CALLBACK METHODS
/////////////////////////
7 changes: 7 additions & 0 deletions ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp
Original file line number Diff line number Diff line change
@@ -45,6 +45,13 @@ bool DdsPipeConfiguration::is_valid(
topic_routes.is_valid(error_msg, participant_ids);
}

void DdsPipeConfiguration::reload(
const DdsPipeConfiguration& new_configuration)
{
this->allowlist = new_configuration.allowlist;
this->blocklist = new_configuration.blocklist;
}

RoutesConfiguration DdsPipeConfiguration::get_routes_config(
const utils::Heritable<types::DistributedTopic>& topic) const noexcept
{
149 changes: 88 additions & 61 deletions ddspipe_core/src/cpp/core/DdsPipe.cpp
Original file line number Diff line number Diff line change
@@ -30,15 +30,11 @@ using namespace eprosima::ddspipe::core::types;

DdsPipe::DdsPipe(
const DdsPipeConfiguration& configuration,
const std::shared_ptr<AllowedTopicList>& allowed_topics,
const std::shared_ptr<DiscoveryDatabase>& discovery_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const std::set<utils::Heritable<DistributedTopic>>& builtin_topics, /* = {} */
bool start_enable /* = false */)
const std::shared_ptr<utils::SlotThreadPool>& thread_pool)
: configuration_(configuration)
, allowed_topics_(allowed_topics)
, discovery_database_(discovery_database)
, payload_pool_(payload_pool)
, participants_database_(participants_database)
@@ -56,6 +52,10 @@ DdsPipe::DdsPipe(
"Configuration for DDS Pipe is invalid: " << error_msg);
}

// Init topic allowed
init_allowed_topics_();

// For each participant, save their manual topics
load_manual_topics_into_participants_();

// Add callback to be called by the discovery database when an Endpoint is discovered
@@ -71,13 +71,13 @@ DdsPipe::DdsPipe(
std::placeholders::_1));

// Create Bridges for builtin topics
init_bridges_nts_(builtin_topics);
init_bridges_nts_(configuration_.builtin_topics);

// Enable thread pool
thread_pool_->enable();

// Enable if set
if (start_enable)
if (configuration_.init_enabled)
{
enable();
}
@@ -116,8 +116,87 @@ DdsPipe::~DdsPipe()
logDebug(DDSPIPE, "DDS Pipe destroyed.");
}

utils::ReturnCode DdsPipe::reload_allowed_topics(
const std::shared_ptr<AllowedTopicList>& allowed_topics)
utils::ReturnCode DdsPipe::reload_configuration(
const DdsPipeConfiguration& new_configuration)
{
// Check that the configuration is correct
utils::Formatter error_msg;
if (!new_configuration.is_valid(error_msg))
{
throw utils::ConfigurationException(
utils::Formatter() <<
"Configuration for Reload DDS Router is invalid: " << error_msg);
}

auto allowed_topics = std::make_shared<ddspipe::core::AllowedTopicList>(
new_configuration.allowlist,
new_configuration.blocklist);

return reload_allowed_topics_(allowed_topics);
}

utils::ReturnCode DdsPipe::enable() noexcept
{
std::lock_guard<std::mutex> lock(mutex_);

if (!enabled_)
{
enabled_ = true;

logInfo(DDSPIPE, "Enabling DDS Pipe.");

activate_all_topics_nts_();

// Enable services discovered while pipe disabled
for (auto it : current_services_)
{
// Enable only allowed services
if (it.second)
{
rpc_bridges_[it.first]->enable();
}
}

return utils::ReturnCode::RETCODE_OK;
}
else
{
logInfo(DDSPIPE, "Trying to enable an already enabled DDS Pipe.");
return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET;
}
}

utils::ReturnCode DdsPipe::disable() noexcept
{
std::lock_guard<std::mutex> lock(mutex_);

if (enabled_)
{
enabled_ = false;

logInfo(DDSPIPE, "Disabling DDS Pipe.");

deactivate_all_topics_nts_();

return utils::ReturnCode::RETCODE_OK;
}
else
{
logInfo(DDSPIPE, "Trying to disable a disabled DDS Pipe.");
return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET;
}
}

void DdsPipe::init_allowed_topics_()
{
allowed_topics_ = std::make_shared<ddspipe::core::AllowedTopicList>(
configuration_.allowlist,
configuration_.blocklist);

logInfo(DDSROUTER, "DDS Router configured with allowed topics: " << *allowed_topics_);
}

utils::ReturnCode DdsPipe::reload_allowed_topics_(const std::shared_ptr<AllowedTopicList>& allowed_topics)
{
std::lock_guard<std::mutex> lock(mutex_);

@@ -179,58 +258,6 @@ utils::ReturnCode DdsPipe::reload_allowed_topics(
return utils::ReturnCode::RETCODE_OK;
}

utils::ReturnCode DdsPipe::enable() noexcept
{
std::lock_guard<std::mutex> lock(mutex_);

if (!enabled_)
{
enabled_ = true;

logInfo(DDSPIPE, "Enabling DDS Pipe.");

activate_all_topics_nts_();

// Enable services discovered while pipe disabled
for (auto it : current_services_)
{
// Enable only allowed services
if (it.second)
{
rpc_bridges_[it.first]->enable();
}
}

return utils::ReturnCode::RETCODE_OK;
}
else
{
logInfo(DDSPIPE, "Trying to enable an already enabled DDS Pipe.");
return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET;
}
}

utils::ReturnCode DdsPipe::disable() noexcept
{
std::lock_guard<std::mutex> lock(mutex_);

if (enabled_)
{
enabled_ = false;

logInfo(DDSPIPE, "Disabling DDS Pipe.");

deactivate_all_topics_nts_();

return utils::ReturnCode::RETCODE_OK;
}
else
{
logInfo(DDSPIPE, "Trying to disable a disabled DDS Pipe.");
return utils::ReturnCode::RETCODE_PRECONDITION_NOT_MET;
}
}

void DdsPipe::load_manual_topics_into_participants_() noexcept
{
for (const auto& manual_topic : configuration_.manual_topics)
Loading

0 comments on commit 871c871

Please sign in to comment.