Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local DDS Router filtering #284

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ddsrouter_core/include/ddsrouter_core/types/common/types.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file types.hpp
*
* Contains auxiliary data types.
*/

#ifndef _DDSROUTERCORE_TYPES_COMMON_TYPES_HPP_
#define _DDSROUTERCORE_TYPES_COMMON_TYPES_HPP_

#include <set>

#include <ddsrouter_utils/types/Atomicable.hpp>

#include <ddsrouter_core/library/library_dll.h>
#include <ddsrouter_core/types/dds/Guid.hpp>

namespace eprosima {
namespace ddsrouter {
namespace core {
namespace types {

using GuidPrefixDataFilterType = utils::SharedAtomicable<std::set<types::GuidPrefix>>;


} /* namespace types */
} /* namespace core */
} /* namespace ddsrouter */
} /* namespace eprosima */

#endif /* _DDSROUTERCORE_TYPES_COMMON_TYPES_HPP_ */
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ CommonParticipant::CommonParticipant(
unsigned int max_history_depth)
: BaseParticipant(participant_configuration, payload_pool, discovery_database)
, max_history_depth_(max_history_depth)
, target_guids_writer_filter_(std::make_shared<types::GuidPrefixDataFilterType>())
{
create_participant_(
domain_id,
participant_attributes);

// Add this same Participant to be filtered by the Writer
add_filter_guidprefix_(rtps_participant_->getGuid().guidPrefix);
}

CommonParticipant::~CommonParticipant()
Expand All @@ -67,20 +71,33 @@ void CommonParticipant::onParticipantDiscovery(
{
if (info.status == fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
logInfo(DDSROUTER_DISCOVERY,
"Found in Participant " << this->id_nts_() << " new Participant " << info.info.m_guid << ".");
// Check whether this Participant belongs to a local Dds Router and is such, filter its sources
if (is_local_ddsrouter_participant_(info))
{
// Found local DDS Router, so filtering it
add_filter_guidprefix_(info.info.m_guid.guidPrefix);
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY,
"Found in Participant " << this->id_nts_() <<
" new Local Participant " << info.info.m_guid <<
" that belongs to a DDS Router, so filtering it.");
}
else
{
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY,
"Found in Participant " << this->id_nts_() << " new Participant " << info.info.m_guid << ".");
}
}
else if (info.status == fastrtps::rtps::ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT)
{
logInfo(DDSROUTER_DISCOVERY, "Participant " << info.info.m_guid << " changed QoS.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Participant " << info.info.m_guid << " changed QoS.");
}
else if (info.status == fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
logInfo(DDSROUTER_DISCOVERY, "Participant " << info.info.m_guid << " removed.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Participant " << info.info.m_guid << " removed.");
}
else
{
logInfo(DDSROUTER_DISCOVERY, "Participant " << info.info.m_guid << " dropped.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Participant " << info.info.m_guid << " dropped.");
}
}
}
Expand Down Expand Up @@ -142,27 +159,27 @@ void CommonParticipant::onReaderDiscovery(

if (info.status == fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER)
{
logInfo(DDSROUTER_DISCOVERY,
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY,
"Found in Participant " << this->id_nts_() << " new Reader " << info.info.guid() << ".");

this->discovery_database_->add_endpoint(info_reader);
}
else if (info.status == fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER)
{
logInfo(DDSROUTER_DISCOVERY, "Reader " << info.info.guid() << " changed QoS.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Reader " << info.info.guid() << " changed QoS.");

this->discovery_database_->update_endpoint(info_reader);
}
else if (info.status == fastrtps::rtps::ReaderDiscoveryInfo::REMOVED_READER)
{
logInfo(DDSROUTER_DISCOVERY, "Reader " << info.info.guid() << " removed.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Reader " << info.info.guid() << " removed.");

info_reader.active(false);
this->discovery_database_->update_endpoint(info_reader);
}
else
{
logInfo(DDSROUTER_DISCOVERY, "Reader " << info.info.guid() << " dropped.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Reader " << info.info.guid() << " dropped.");

info_reader.active(false);
this->discovery_database_->update_endpoint(info_reader);
Expand All @@ -180,27 +197,27 @@ void CommonParticipant::onWriterDiscovery(

if (info.status == fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER)
{
logInfo(DDSROUTER_DISCOVERY,
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY,
"Found in Participant " << this->id_nts_() << " new Writer " << info.info.guid() << ".");

this->discovery_database_->add_endpoint(info_writer);
}
else if (info.status == fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER)
{
logInfo(DDSROUTER_DISCOVERY, "Writer " << info.info.guid() << " changed QoS.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Writer " << info.info.guid() << " changed QoS.");

this->discovery_database_->update_endpoint(info_writer);
}
else if (info.status == fastrtps::rtps::WriterDiscoveryInfo::REMOVED_WRITER)
{
logInfo(DDSROUTER_DISCOVERY, "Writer " << info.info.guid() << " removed.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Writer " << info.info.guid() << " removed.");

info_writer.active(false);
this->discovery_database_->update_endpoint(info_writer);
}
else
{
logInfo(DDSROUTER_DISCOVERY, "Writer " << info.info.guid() << " dropped.");
logInfo(DDSROUTER_RTPSPARTICIPANT_DISCOVERY, "Writer " << info.info.guid() << " dropped.");

info_writer.active(false);
this->discovery_database_->update_endpoint(info_writer);
Expand Down Expand Up @@ -245,6 +262,7 @@ std::shared_ptr<IWriter> CommonParticipant::create_writer_(
this->payload_pool_,
rtps_participant_,
max_history_depth_,
target_guids_writer_filter_,
this->configuration_->is_repeater);
}

Expand All @@ -258,6 +276,13 @@ std::shared_ptr<IReader> CommonParticipant::create_reader_(
rtps_participant_);
}

void CommonParticipant::add_filter_guidprefix_(const types::GuidPrefix& guid_to_filter) noexcept
{
// Lock to write
std::unique_lock<types::GuidPrefixDataFilterType> lock(*target_guids_writer_filter_);
target_guids_writer_filter_->insert(guid_to_filter);
}

fastrtps::rtps::RTPSParticipantAttributes
CommonParticipant::participant_attributes_(
const configuration::ParticipantConfiguration* participant_configuration)
Expand All @@ -267,9 +292,48 @@ CommonParticipant::participant_attributes_(
// Add Participant name
params.setName(participant_configuration->id.id_name().c_str());

// Set property so other Routers know the Participants belongs to a Router and its kind
eprosima::fastrtps::rtps::Property router_kind_property;
router_kind_property.name(std::string(ROUTER_PROPERTY_KIND_NAME_));
router_kind_property.value(std::string(types::PARTICIPANT_KIND_STRINGS[static_cast<int>(participant_configuration->kind)]));
router_kind_property.propagate(true);
// Add it to properties
params.properties.properties().push_back(router_kind_property);

// Set property so other Routers know the Participant is local or wan
eprosima::fastrtps::rtps::Property router_positioning_property;
router_positioning_property.name(std::string(ROUTER_PROPERTY_POSITIONING_NAME_));
router_positioning_property.value(
std::string(
(participant_configuration->kind == types::ParticipantKind::local_discovery_server ||
participant_configuration->kind == types::ParticipantKind::simple_rtps)
? ROUTER_PROPERTY_POSITIONING_VALUE_LOCAL_
: ROUTER_PROPERTY_POSITIONING_VALUE_WAN_));
router_positioning_property.propagate(true);
// Add it to properties
params.properties.properties().push_back(router_positioning_property);

return params;
}

bool CommonParticipant::is_local_ddsrouter_participant_(const fastrtps::rtps::ParticipantDiscoveryInfo& info) noexcept
{

// Check if router positioning property exist
for (auto pit = info.info.m_properties.begin();
pit != info.info.m_properties.end();
++pit)
{
// Attribute found
if (pit->first() == std::string(ROUTER_PROPERTY_POSITIONING_NAME_))
{
return pit->second() == std::string(ROUTER_PROPERTY_POSITIONING_VALUE_LOCAL_);
}
}

return false;
}

} /* namespace rtps */
} /* namespace core */
} /* namespace ddsrouter */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ class CommonParticipant
types::Endpoint create_endpoint_from_info_(
DiscoveryInfoKind& info);

void add_filter_guidprefix_(const types::GuidPrefix& guid_to_filter) noexcept;

/////
// RTPS specific methods

static fastrtps::rtps::RTPSParticipantAttributes participant_attributes_(
const configuration::ParticipantConfiguration* participant_configuration);

static bool is_local_ddsrouter_participant_(const fastrtps::rtps::ParticipantDiscoveryInfo& info) noexcept;

/////
// VARIABLES
eprosima::fastrtps::rtps::RTPSParticipant* rtps_participant_;
Expand All @@ -101,6 +105,21 @@ class CommonParticipant

//! Maximum depth of RTPS History instances
unsigned int max_history_depth_;

/**
* @brief Collects these guid prefixes that the Writer must filter and not send messages to them.
*
* It does add also this same Participant.
*/
std::shared_ptr<types::GuidPrefixDataFilterType> target_guids_writer_filter_;

/*
* These constants set the name of property and values used in Property QoS
*/
static constexpr const char* ROUTER_PROPERTY_KIND_NAME_ = "fastdds.ddsrouter.kind";
static constexpr const char* ROUTER_PROPERTY_POSITIONING_NAME_ = "fastdds.ddsrouter.positioning";
static constexpr const char* ROUTER_PROPERTY_POSITIONING_VALUE_LOCAL_ = "local";
static constexpr const char* ROUTER_PROPERTY_POSITIONING_VALUE_WAN_ = "wan";
};

} /* namespace rtps */
Expand All @@ -109,3 +128,4 @@ class CommonParticipant
} /* namespace eprosima */

#endif /* __SRC_DDSROUTERCORE_PARTICIPANT_IMPLEMENTATIONS_RTPS_COMMONPARTICIPANT_HPP_ */

14 changes: 7 additions & 7 deletions ddsrouter_core/src/cpp/reader/implementations/rtps/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@ void Reader::enable_() noexcept
on_data_available_();
}

bool Reader::come_from_this_participant_(
bool Reader::accept_message_from_this_source_(
const fastrtps::rtps::CacheChange_t* change) const noexcept
{
return come_from_this_participant_(change->writerGUID);
return accept_message_from_this_source_(change->writerGUID);
}

bool Reader::come_from_this_participant_(
bool Reader::accept_message_from_this_source_(
const fastrtps::rtps::GUID_t guid) const noexcept
{
return guid.guidPrefix == rtps_reader_->getGuid().guidPrefix;
return true;
}

fastrtps::rtps::HistoryAttributes Reader::history_attributes_() const noexcept
Expand Down Expand Up @@ -255,7 +255,7 @@ void Reader::onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader*,
const fastrtps::rtps::CacheChange_t* const change) noexcept
{
if (!come_from_this_participant_(change))
if (accept_message_from_this_source_(change))
{
// Do not remove previous received changes so they can be read when the reader is enabled
if (enabled_)
Expand All @@ -279,7 +279,7 @@ void Reader::onNewCacheChangeAdded(
{
logWarning(
DDSROUTER_RTPS_READER_LISTENER,
"Ignoring data from this same Participant in reader " << *this << ".");
"Ignoring data from " << change->writerGUID << " in reader " << *this << ".");

// If it is a message from this Participant, do not send it forward and remove it
// TODO: do this more elegant
Expand All @@ -291,7 +291,7 @@ void Reader::onReaderMatched(
fastrtps::rtps::RTPSReader*,
fastrtps::rtps::MatchingInfo& info) noexcept
{
if (!come_from_this_participant_(info.remoteEndpointGuid))
if (accept_message_from_this_source_(info.remoteEndpointGuid))
{
if (info.status == fastrtps::rtps::MatchingStatus::MATCHED_MATCHING)
{
Expand Down
8 changes: 4 additions & 4 deletions ddsrouter_core/src/cpp/reader/implementations/rtps/Reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
#include <fastrtps/rtps/reader/RTPSReader.h>
#include <fastrtps/rtps/reader/ReaderListener.h>

#include <reader/implementations/auxiliar/BaseReader.hpp>

#include <ddsrouter_core/types/participant/ParticipantId.hpp>

#include <reader/implementations/auxiliar/BaseReader.hpp>

namespace eprosima {
namespace ddsrouter {
namespace core {
Expand Down Expand Up @@ -160,11 +160,11 @@ class Reader : public BaseReader, public fastrtps::rtps::ReaderListener
// Reader specific methods

//! Whether a change received is from this Participant (to avoid auto-feedback)
bool come_from_this_participant_(
bool accept_message_from_this_source_(
const fastrtps::rtps::CacheChange_t* change) const noexcept;

//! Whether a guid references this Participant (to avoid auto-feedback)
bool come_from_this_participant_(
bool accept_message_from_this_source_(
const fastrtps::rtps::GUID_t guid) const noexcept;

/////
Expand Down
8 changes: 5 additions & 3 deletions ddsrouter_core/src/cpp/writer/implementations/rtps/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <efficiency/cache_change/CacheChangePool.hpp>
#include <writer/implementations/rtps/Writer.hpp>
#include <writer/implementations/rtps/filter/RepeaterDataFilter.hpp>
#include <writer/implementations/rtps/filter/SelfDataFilter.hpp>
#include <writer/implementations/rtps/filter/GuidDataFilter.hpp>
#include <types/dds/RouterCacheChange.hpp>

namespace eprosima {
Expand All @@ -41,9 +41,11 @@ Writer::Writer(
std::shared_ptr<PayloadPool> payload_pool,
fastrtps::rtps::RTPSParticipant* rtps_participant,
unsigned int max_history_depth,
std::shared_ptr<types::GuidPrefixDataFilterType> target_guids_filter,
const bool repeater /* = false */)
: BaseWriter(participant_id, topic, payload_pool)
, repeater_(repeater)
, target_guids_filter_(target_guids_filter)
{
// TODO Use payload pool for this writer, so change does not need to be copied

Expand Down Expand Up @@ -100,12 +102,12 @@ Writer::Writer(
if (repeater)
{
// Use filter writer of origin
data_filter_ = std::make_unique<RepeaterDataFilter>();
data_filter_ = std::make_unique<RepeaterDataFilter>(target_guids_filter);
}
else
{
// Use default filter
data_filter_ = std::make_unique<SelfDataFilter>();
data_filter_ = std::make_unique<GuidDataFilter>(target_guids_filter);
}

rtps_writer_->reader_data_filter(data_filter_.get());
Expand Down
Loading