Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free the changes that have been acknowledged by all readers (on volatile topics) [19800] #71

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class CommonWriter : public BaseWriter, public fastrtps::rtps::WriterListener
/**
* @brief CommonWriter Listener callback when a new Reader is matched or unmatched
*
* This method is call every time a new Reader is matched or unmatched from this CommonWriter.
* This method is called every time a new Reader is matched or unmatched from this CommonWriter.
* It only creates a log for matching and unmatching (in case it is not a reader from this same Participant)
*
* @param [in] info information about the matched Reader
Expand All @@ -108,6 +108,19 @@ class CommonWriter : public BaseWriter, public fastrtps::rtps::WriterListener
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::MatchingInfo& info) noexcept override;

/**
* @brief CommonWriter Listener callback when all the Readers have received a change.
*
* This method is called when all the Readers subscribed to a Topic acknowledge that they have received a change.
* It removes the change from the Writer's history if the Writer is best-effort or volatile.
*
* @param [in] ch the change that has been acknowledged by all the Readers.
*/
DDSPIPE_PARTICIPANTS_DllAPI
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

/**
* This method is called when a new Reader is discovered, with a Topic that
* matches that of a local writer, but with a requested QoS that is incompatible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,11 @@ CommonParticipant::CommonParticipant(
fastdds::dds::DomainParticipantQos CommonParticipant::reckon_participant_qos_() const
{
auto qos = fastdds::dds::DomainParticipantFactory::get_instance()->get_default_participant_qos();

qos.properties().properties().emplace_back(
"fastdds.ignore_local_endpoints",
"true");

return qos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ CommonParticipant::reckon_participant_attributes_(
// Add Participant name
params.setName(participant_configuration->id.c_str());

// Ignore the local endpoints so that the reader and writer of the same participant don't match.
params.properties.properties().emplace_back(
"fastdds.ignore_local_endpoints",
"true");

return params;
}

Expand Down
39 changes: 22 additions & 17 deletions ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ void CommonWriter::onWriterMatched(
}
}

void CommonWriter::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter* /*writer*/,
fastrtps::rtps::CacheChange_t* change)
{
if (writer_qos_.m_reliability.kind == fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ||
writer_qos_.m_durability.kind == fastdds::dds::VOLATILE_DURABILITY_QOS)
{
rtps_history_->remove_change_g(change);
}
}

void CommonWriter::on_offered_incompatible_qos(
fastrtps::rtps::RTPSWriter*,
eprosima::fastdds::dds::PolicyMask qos) noexcept
Expand All @@ -147,10 +158,10 @@ utils::ReturnCode CommonWriter::write_nts_(

// Take new Change from history
fastrtps::rtps::CacheChange_t* new_change;

if (topic_.topic_qos.keyed)
{
new_change =
rtps_writer_->new_change(
new_change = rtps_writer_->new_change(
rtps_data.kind,
rtps_data.instanceHandle);
}
Expand Down Expand Up @@ -180,24 +191,20 @@ utils::ReturnCode CommonWriter::write_nts_(
return ret;
}

if (rtps_history_->isFull())
juanlofer-eprosima marked this conversation as resolved.
Show resolved Hide resolved
{
// Remove the oldest cache change when the max history size is reached.
rtps_history_->remove_min_change();
}

// Send data by adding it to CommonWriter History
rtps_history_->add_change(new_change, write_params);

// In the case of BEST_EFFORT, add_change calls onWriterChangeReceivedByAll (which removes the change).

// At this point, write params is now the output of adding change
fill_sent_data_(write_params, rtps_data);

// Remove change could be done here in non reliable as it is synchronous because change has already been sent
// and does not require to be resent under any circumstance.
if (!topic_.topic_qos.is_reliable())
{
rtps_history_->remove_change(new_change);
}
else if (rtps_history_->isFull())
{
// When max history size is reached, remove oldest cache change
rtps_history_->remove_min_change();
}

return utils::ReturnCode::RETCODE_OK;
}

Expand Down Expand Up @@ -247,9 +254,7 @@ void CommonWriter::fill_sent_data_(
const eprosima::fastrtps::rtps::WriteParams& params,
core::types::RtpsPayloadData& data_to_fill) const noexcept
{
// Set data output parameters
// TODO move to RPC
// data_to_fill->sent_sequence_number = params.sample_identity().sequence_number();
// Do nothing
}

void CommonWriter::internal_entities_creation_(
Expand Down
Loading