Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mitigate discovery race condition between clients and services (v2) [ROS2-48] #153

Draft
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#include "rmw/rmw.h"
#include "rosidl_typesupport_cpp/message_type_support.hpp"

#ifndef DDS_GUID_INITIALIZER
#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT
#endif /* DDS_GUID_INITIALIZER */

class RMW_Connext_MessageTypeSupport;
class RMW_Connext_Publisher;
class RMW_Connext_Subscriber;
Expand Down Expand Up @@ -292,4 +296,9 @@ rmw_connextdds_get_cft_filter_expression(
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * const options);

rmw_ret_t
rmw_connextdds_guid_to_instance_handle(
const struct DDS_GUID_t * const guid,
DDS_InstanceHandle_t * const instanceHandle);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,13 @@
#define RMW_CONNEXT_LIMIT_KEEP_ALL_SAMPLES 1000
#endif /* RMW_CONNEXT_LIMIT_SAMPLES_MAX */

#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT
#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT 100000 /* 100ms */
#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT */

#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE
#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE (31536000ull * 1000000ull) /* 1 year */
#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE */


#endif // RMW_CONNEXTDDS__RESOURCE_LIMITS_HPP_
107 changes: 102 additions & 5 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>

#include "rmw_connextdds/context.hpp"
#include "rmw_connextdds/type_support.hpp"
Expand Down Expand Up @@ -70,6 +71,28 @@ bool rmw_connextdds_find_string_in_list(
DDS_Duration_t rmw_connextdds_duration_from_ros_time(
const rmw_time_t * const ros_time);

class RMW_Connext_OrderedGid
{
public:
explicit RMW_Connext_OrderedGid(const rmw_gid_t & value)
: value(value)
{
}

bool operator<(const RMW_Connext_OrderedGid & other) const
{
return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) < 0;
}

bool operator==(const RMW_Connext_OrderedGid & other) const
{
return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) == 0;
}

private:
rmw_gid_t value;
};

/******************************************************************************
* WaitSet wrapper
******************************************************************************/
Expand Down Expand Up @@ -157,7 +180,7 @@ class RMW_Connext_Publisher
RMW_Connext_WriteParams * const params);

rmw_ret_t
enable() const
enable()
{
if (DDS_RETCODE_OK !=
DDS_Entity_enable(
Expand All @@ -183,6 +206,8 @@ class RMW_Connext_Publisher
return RMW_RET_ERROR;
}

max_blocking_time = load_max_blocking_time();

return RMW_RET_OK;
}

Expand Down Expand Up @@ -215,6 +240,12 @@ class RMW_Connext_Publisher
DDS_SampleIdentity_t * const sample_identity,
DDS_SampleIdentity_t * const related_sample_identity);

rmw_ret_t
wait_for_subscription(
rmw_gid_t & reader_gid,
bool & unmatched,
rmw_gid_t & related_writer_gid);

DDS_Topic * dds_topic() const
{
return DDS_DataWriter_get_topic(this->dds_writer);
Expand All @@ -232,19 +263,82 @@ class RMW_Connext_Publisher
return DDS_Publisher_get_participant(pub);
}

void
push_related_endpoints(const rmw_gid_t & endpoint, const rmw_gid_t & related)
{
std::lock_guard<std::mutex> lock(matched_mutex);
known_endpoints.emplace(RMW_Connext_OrderedGid(endpoint), related);
known_endpoints.emplace(RMW_Connext_OrderedGid(related), endpoint);
}

void
on_publication_matched(
const DDS_PublicationMatchedStatus * const status)
{
std::lock_guard<std::mutex> lock(matched_mutex);
DDS_ReturnCode_t dds_rc =
DDS_DataWriter_get_matched_subscriptions(writer(), &matched_subscriptions);
if (DDS_RETCODE_OK != dds_rc) {
RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc)
}
if (status->current_count_change < 0) {
rmw_gid_t unmatched_gid;
rmw_connextdds_ih_to_gid(status->last_subscription_handle, unmatched_gid);
pop_related_endpoints(unmatched_gid);
}
matched_cv.notify_all();
}

void
on_related_subscription_matched(
RMW_Connext_Subscriber * const sub,
const DDS_SubscriptionMatchedStatus * const status)
{
UNUSED_ARG(sub);
std::lock_guard<std::mutex> lock(matched_mutex);
if (status->current_count_change < 0) {
rmw_gid_t unmatched_gid;
rmw_connextdds_ih_to_gid(status->last_publication_handle, unmatched_gid);
pop_related_endpoints(unmatched_gid);
}
matched_cv.notify_all();
}

rmw_context_impl_t * const ctx;

~RMW_Connext_Publisher();

private:
rmw_context_impl_t * ctx;
DDS_DataWriter * dds_writer;
RMW_Connext_MessageTypeSupport * type_support;
const bool created_topic;
rmw_gid_t ros_gid;
RMW_Connext_PublisherStatusCondition status_condition;
std::mutex matched_mutex;
std::condition_variable matched_cv;
std::chrono::microseconds max_blocking_time;
std::map<RMW_Connext_OrderedGid, rmw_gid_t> known_endpoints;
DDS_InstanceHandleSeq matched_subscriptions;

RMW_Connext_Publisher(
rmw_context_impl_t * const ctx,
DDS_DataWriter * const dds_writer,
RMW_Connext_MessageTypeSupport * const type_support,
const bool created_topic);

void
pop_related_endpoints(const rmw_gid_t & endpoint)
{
auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(endpoint));
if (endpoint_entry == known_endpoints.end()) {
return;
}
known_endpoints.erase(RMW_Connext_OrderedGid(endpoint));
known_endpoints.erase(RMW_Connext_OrderedGid(endpoint_entry->second));
}

std::chrono::microseconds
load_max_blocking_time() const;
};

rmw_publisher_t *
Expand Down Expand Up @@ -286,7 +380,8 @@ class RMW_Connext_Subscriber
const bool intro_members_cpp = false,
std::string * const type_name = nullptr,
const char * const cft_name = nullptr,
const char * const cft_filter = nullptr);
const char * const cft_filter = nullptr,
RMW_Connext_Publisher * const related_pub = nullptr);

rmw_ret_t
finalize();
Expand Down Expand Up @@ -521,7 +616,7 @@ class RMW_Connext_Subscriber
const bool ignore_local;

private:
rmw_context_impl_t * ctx;
rmw_context_impl_t * const ctx;
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
Expand All @@ -535,6 +630,7 @@ class RMW_Connext_Subscriber
size_t loan_len;
size_t loan_next;
std::mutex loan_mutex;
RMW_Connext_Publisher * const related_pub;

RMW_Connext_Subscriber(
rmw_context_impl_t * const ctx,
Expand All @@ -545,7 +641,8 @@ class RMW_Connext_Subscriber
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);
const bool internal,
RMW_Connext_Publisher * const related_pub);

friend class RMW_Connext_SubscriberStatusCondition;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
DDS_DataReader * reader);

rmw_ret_t
install(RMW_Connext_Subscriber * const sub);
install(
RMW_Connext_Subscriber * const sub,
RMW_Connext_Publisher * const related_pub = nullptr);

void
on_requested_deadline_missed(
Expand Down Expand Up @@ -909,7 +911,8 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
DDS_SampleLostStatus status_sample_lost_last;
DDS_SubscriptionMatchedStatus status_matched_last;

RMW_Connext_Subscriber * sub;
RMW_Connext_Subscriber * sub{nullptr};
RMW_Connext_Publisher * related_pub{nullptr};

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct RMW_Connext_RequestReplyMessage
{
bool request;
rmw_gid_t gid;
rmw_gid_t writer_gid;
int64_t sn;
void * payload;
};
Expand Down
Loading