diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 55a3a0b9..f09caf33 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -33,6 +33,10 @@ #include "rmw/rmw.h" #include "rosidl_typesupport_cpp/message_type_support.hpp" +#ifndef DDS_GUID_INITIALIZER +#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT +#endif /* DDS_GUID_INITIALIZER */ + class RMW_Connext_MessageTypeSupport; class RMW_Connext_Publisher; class RMW_Connext_Subscriber; @@ -292,4 +296,9 @@ rmw_connextdds_get_cft_filter_expression( rcutils_allocator_t * const allocator, rmw_subscription_content_filter_options_t * const options); +rmw_ret_t +rmw_connextdds_guid_to_instance_handle( + const struct DDS_GUID_t * const guid, + DDS_InstanceHandle_t * const instanceHandle); + #endif // RMW_CONNEXTDDS__DDS_API_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp b/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp index f091e2b2..f7bd4ae9 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp @@ -55,4 +55,13 @@ #define RMW_CONNEXT_LIMIT_KEEP_ALL_SAMPLES 1000 #endif /* RMW_CONNEXT_LIMIT_SAMPLES_MAX */ +#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT +#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT 100000 /* 100ms */ +#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT */ + +#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE +#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE (31536000ull * 1000000ull) /* 1 year */ +#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE */ + + #endif // RMW_CONNEXTDDS__RESOURCE_LIMITS_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 56fffdad..943d5f8b 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "rmw_connextdds/context.hpp" #include "rmw_connextdds/type_support.hpp" @@ -70,6 +71,28 @@ bool rmw_connextdds_find_string_in_list( DDS_Duration_t rmw_connextdds_duration_from_ros_time( const rmw_time_t * const ros_time); +class RMW_Connext_OrderedGid +{ +public: + explicit RMW_Connext_OrderedGid(const rmw_gid_t & value) + : value(value) + { + } + + bool operator<(const RMW_Connext_OrderedGid & other) const + { + return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) < 0; + } + + bool operator==(const RMW_Connext_OrderedGid & other) const + { + return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) == 0; + } + +private: + rmw_gid_t value; +}; + /****************************************************************************** * WaitSet wrapper ******************************************************************************/ @@ -157,7 +180,7 @@ class RMW_Connext_Publisher RMW_Connext_WriteParams * const params); rmw_ret_t - enable() const + enable() { if (DDS_RETCODE_OK != DDS_Entity_enable( @@ -183,6 +206,8 @@ class RMW_Connext_Publisher return RMW_RET_ERROR; } + max_blocking_time = load_max_blocking_time(); + return RMW_RET_OK; } @@ -215,6 +240,12 @@ class RMW_Connext_Publisher DDS_SampleIdentity_t * const sample_identity, DDS_SampleIdentity_t * const related_sample_identity); + rmw_ret_t + wait_for_subscription( + rmw_gid_t & reader_gid, + bool & unmatched, + rmw_gid_t & related_writer_gid); + DDS_Topic * dds_topic() const { return DDS_DataWriter_get_topic(this->dds_writer); @@ -232,19 +263,82 @@ class RMW_Connext_Publisher return DDS_Publisher_get_participant(pub); } + void + push_related_endpoints(const rmw_gid_t & endpoint, const rmw_gid_t & related) + { + std::lock_guard lock(matched_mutex); + known_endpoints.emplace(RMW_Connext_OrderedGid(endpoint), related); + known_endpoints.emplace(RMW_Connext_OrderedGid(related), endpoint); + } + + void + on_publication_matched( + const DDS_PublicationMatchedStatus * const status) + { + std::lock_guard lock(matched_mutex); + DDS_ReturnCode_t dds_rc = + DDS_DataWriter_get_matched_subscriptions(writer(), &matched_subscriptions); + if (DDS_RETCODE_OK != dds_rc) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc) + } + if (status->current_count_change < 0) { + rmw_gid_t unmatched_gid; + rmw_connextdds_ih_to_gid(status->last_subscription_handle, unmatched_gid); + pop_related_endpoints(unmatched_gid); + } + matched_cv.notify_all(); + } + + void + on_related_subscription_matched( + RMW_Connext_Subscriber * const sub, + const DDS_SubscriptionMatchedStatus * const status) + { + UNUSED_ARG(sub); + std::lock_guard lock(matched_mutex); + if (status->current_count_change < 0) { + rmw_gid_t unmatched_gid; + rmw_connextdds_ih_to_gid(status->last_publication_handle, unmatched_gid); + pop_related_endpoints(unmatched_gid); + } + matched_cv.notify_all(); + } + + rmw_context_impl_t * const ctx; + + ~RMW_Connext_Publisher(); + private: - rmw_context_impl_t * ctx; DDS_DataWriter * dds_writer; RMW_Connext_MessageTypeSupport * type_support; const bool created_topic; rmw_gid_t ros_gid; RMW_Connext_PublisherStatusCondition status_condition; + std::mutex matched_mutex; + std::condition_variable matched_cv; + std::chrono::microseconds max_blocking_time; + std::map known_endpoints; + DDS_InstanceHandleSeq matched_subscriptions; RMW_Connext_Publisher( rmw_context_impl_t * const ctx, DDS_DataWriter * const dds_writer, RMW_Connext_MessageTypeSupport * const type_support, const bool created_topic); + + void + pop_related_endpoints(const rmw_gid_t & endpoint) + { + auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(endpoint)); + if (endpoint_entry == known_endpoints.end()) { + return; + } + known_endpoints.erase(RMW_Connext_OrderedGid(endpoint)); + known_endpoints.erase(RMW_Connext_OrderedGid(endpoint_entry->second)); + } + + std::chrono::microseconds + load_max_blocking_time() const; }; rmw_publisher_t * @@ -286,7 +380,8 @@ class RMW_Connext_Subscriber const bool intro_members_cpp = false, std::string * const type_name = nullptr, const char * const cft_name = nullptr, - const char * const cft_filter = nullptr); + const char * const cft_filter = nullptr, + RMW_Connext_Publisher * const related_pub = nullptr); rmw_ret_t finalize(); @@ -521,7 +616,7 @@ class RMW_Connext_Subscriber const bool ignore_local; private: - rmw_context_impl_t * ctx; + rmw_context_impl_t * const ctx; DDS_DataReader * dds_reader; DDS_Topic * dds_topic; DDS_TopicDescription * dds_topic_cft; @@ -535,6 +630,7 @@ class RMW_Connext_Subscriber size_t loan_len; size_t loan_next; std::mutex loan_mutex; + RMW_Connext_Publisher * const related_pub; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, @@ -545,7 +641,8 @@ class RMW_Connext_Subscriber const bool created_topic, DDS_TopicDescription * const dds_topic_cft, const char * const cft_expression, - const bool internal); + const bool internal, + RMW_Connext_Publisher * const related_pub); friend class RMW_Connext_SubscriberStatusCondition; }; diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 45bcedf6..878b2698 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -680,7 +680,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition DDS_DataReader * reader); rmw_ret_t - install(RMW_Connext_Subscriber * const sub); + install( + RMW_Connext_Subscriber * const sub, + RMW_Connext_Publisher * const related_pub = nullptr); void on_requested_deadline_missed( @@ -909,7 +911,8 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition DDS_SampleLostStatus status_sample_lost_last; DDS_SubscriptionMatchedStatus status_matched_last; - RMW_Connext_Subscriber * sub; + RMW_Connext_Subscriber * sub{nullptr}; + RMW_Connext_Publisher * related_pub{nullptr}; std::mutex new_data_event_mutex_; rmw_event_callback_t new_data_event_cb_{nullptr}; diff --git a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp index 98c3bad2..09b8ae5f 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp @@ -45,6 +45,7 @@ struct RMW_Connext_RequestReplyMessage { bool request; rmw_gid_t gid; + rmw_gid_t writer_gid; int64_t sn; void * payload; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 3deb1f14..a6c82e67 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -658,7 +658,8 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( dds_writer(dds_writer), type_support(type_support), created_topic(created_topic), - status_condition(dds_writer) + status_condition(dds_writer), + matched_subscriptions(DDS_SEQUENCE_INITIALIZER) { rmw_connextdds_get_entity_gid(this->dds_writer, this->ros_gid); if (RMW_RET_OK != this->status_condition.install(this)) { @@ -667,6 +668,11 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( } } +RMW_Connext_Publisher::~RMW_Connext_Publisher() +{ + DDS_InstanceHandleSeq_finalize(&matched_subscriptions); +} + RMW_Connext_Publisher * RMW_Connext_Publisher::create( rmw_context_impl_t * const ctx, @@ -884,10 +890,6 @@ RMW_Connext_Publisher::finalize() return RMW_RET_OK; } -#ifndef DDS_GUID_INITIALIZER -#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT -#endif /* DDS_GUID_INITIALIZER */ - rmw_ret_t RMW_Connext_Publisher::requestreply_header_to_dds( const RMW_Connext_RequestReplyMessage * const rr_msg, @@ -1020,6 +1022,97 @@ RMW_Connext_Publisher::qos(rmw_qos_profile_t * const qos) return rc; } +std::chrono::microseconds +RMW_Connext_Publisher::load_max_blocking_time() const +{ +#if !RMW_CONNEXT_DDS_API_PRO_LEGACY + DDS_DataWriterQos dw_qos = DDS_DataWriterQos_INITIALIZER; +#else + DDS_DataWriterQos dw_qos; + if (DDS_RETCODE_OK != DDS_DataWriterQos_initialize(&dw_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datawriter qos") + return std::chrono::microseconds(0); + } +#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ + + auto scope_exit_dw_qos_delete = + rcpputils::make_scope_exit( + [&dw_qos]() + { + if (DDS_RETCODE_OK != DDS_DataWriterQos_finalize(&dw_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize DataWriterQoS") + } + }); + + if (DDS_DataWriter_get_qos(writer(), &dw_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get datawriter qos") + return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT); + } + if (DDS_Duration_is_auto(&dw_qos.reliability.max_blocking_time)) { + return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT); + } + if (DDS_Duration_is_infinite(&dw_qos.reliability.max_blocking_time) || + DDS_Duration_is_auto(&dw_qos.reliability.max_blocking_time)) + { + return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE); + } + if (DDS_Duration_is_zero(&dw_qos.reliability.max_blocking_time)) { + return std::chrono::microseconds(0); + } + std::chrono::microseconds max_blocking_time = + std::chrono::duration_cast( + std::chrono::seconds(dw_qos.reliability.max_blocking_time.sec) + + std::chrono::nanoseconds(dw_qos.reliability.max_blocking_time.nanosec)); + + return max_blocking_time; +} + +rmw_ret_t +RMW_Connext_Publisher::wait_for_subscription( + rmw_gid_t & reader_gid, + bool & unmatched, + rmw_gid_t & related_writer_gid) +{ + unmatched = false; + + struct DDS_GUID_t reader_guid = DDS_GUID_INITIALIZER; + rmw_ret_t rc = RMW_RET_ERROR; + rc = rmw_connextdds_gid_to_guid(reader_gid, reader_guid); + if (RMW_RET_OK != rc) { + return rc; + } + + std::unique_lock lock(matched_mutex); + auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(reader_gid)); + if (endpoint_entry == known_endpoints.end()) { + unmatched = true; + return RMW_RET_OK; + } + related_writer_gid = endpoint_entry->second; + + DDS_InstanceHandle_t reader_ih = DDS_HANDLE_NIL; + rc = rmw_connextdds_guid_to_instance_handle(&reader_guid, &reader_ih); + if (RMW_RET_OK != rc) { + return rc; + } + auto done_waiting = [this, &reader_ih]() { + bool matched = false; + const DDS_Long subs_len = DDS_InstanceHandleSeq_get_length(&matched_subscriptions); + for (DDS_Long i = 0; i < subs_len && !matched; i++) { + DDS_InstanceHandle_t * const matched_ih = + DDS_InstanceHandleSeq_get_reference(&matched_subscriptions, i); + matched = DDS_InstanceHandle_compare(matched_ih, &reader_ih) == 0; + } + return matched; + }; + + if (!matched_cv.wait_for(lock, this->max_blocking_time, done_waiting)) { + return RMW_RET_TIMEOUT; + } + + return RMW_RET_OK; +} + rmw_publisher_t * rmw_connextdds_create_publisher( rmw_context_impl_t * const ctx, @@ -1159,7 +1252,8 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool created_topic, DDS_TopicDescription * const dds_topic_cft, const char * const cft_expression, - const bool internal) + const bool internal, + RMW_Connext_Publisher * const related_pub) : internal(internal), ignore_local(ignore_local), ctx(ctx), @@ -1169,7 +1263,8 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( cft_expression(cft_expression), type_support(type_support), created_topic(created_topic), - status_condition(dds_reader, ignore_local, internal) + status_condition(dds_reader, ignore_local, internal), + related_pub(related_pub) { rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); @@ -1180,7 +1275,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( this->loan_info = def_info_seq; this->loan_len = 0; this->loan_next = 0; - if (RMW_RET_OK != this->status_condition.install(this)) { + if (RMW_RET_OK != this->status_condition.install(this, this->related_pub)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on reader") throw std::runtime_error("failed to install condition on reader"); } @@ -1201,7 +1296,8 @@ RMW_Connext_Subscriber::create( const bool intro_members_cpp, std::string * const type_name, const char * const cft_name, - const char * const cft_filter) + const char * const cft_filter, + RMW_Connext_Publisher * const related_pub) { RMW_Connext_MessageTypeSupport * const type_support = RMW_Connext_MessageTypeSupport::register_type_support( @@ -1390,7 +1486,8 @@ RMW_Connext_Subscriber::create( topic_created, cft_topic, sub_cft_expr, - internal); + internal, + related_pub); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") @@ -1667,17 +1764,21 @@ RMW_Connext_Subscriber::requestreply_header_from_dds( { const struct DDS_GUID_t * src_guid = nullptr; const struct DDS_SequenceNumber_t * src_sn = nullptr; + const struct DDS_GUID_t * writer_guid = nullptr; if (rr_msg->request) { - src_guid = &sample_identity->writer_guid; + src_guid = &related_sample_identity->writer_guid; src_sn = &sample_identity->sequence_number; + writer_guid = &sample_identity->writer_guid; } else { src_guid = &related_sample_identity->writer_guid; src_sn = &related_sample_identity->sequence_number; + writer_guid = &sample_identity->writer_guid; } rmw_connextdds_guid_to_gid(*src_guid, rr_msg->gid); rmw_connextdds_sn_dds_to_ros(*src_sn, rr_msg->sn); + rmw_connextdds_guid_to_gid(*writer_guid, rr_msg->writer_gid); } rmw_ret_t @@ -2515,7 +2616,7 @@ RMW_Connext_Client::create( char * cft_name = nullptr, *cft_filter = nullptr; auto scope_exit_cft_name = rcpputils::make_scope_exit( - [cft_name, cft_filter]() + [&cft_name, &cft_filter]() { if (nullptr != cft_name) { DDS_String_free(cft_name); @@ -2662,9 +2763,11 @@ RMW_Connext_Client::take_response( if (taken_msg) { request_header->request_id.sequence_number = rr_msg.sn; + /* (asorbini) although messages are correlated using the reply DataReader's GUID, + we report the writer's GUID to upper layers. */ memcpy( request_header->request_id.writer_guid, - rr_msg.gid.data, + this->request_pub->gid()->data, 16); if (this->ctx->cyclone_compatible) { @@ -2706,10 +2809,15 @@ RMW_Connext_Client::send_request( if (this->ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Basic) { *sequence_id = ++this->next_request_id; rr_msg.sn = *sequence_id; + rr_msg.gid = *this->request_pub->gid(); } else { rr_msg.sn = -1; + /* (asorbini) use the reply DataReader's GUID to correlate request and reply, instead of the + request DataWriter's GUID, so that the service may try to wait for the DataReader to be + matched before writing the reply. */ + rr_msg.gid = *this->reply_sub->gid(); } - rr_msg.gid = *this->request_pub->gid(); + rr_msg.payload = const_cast(ros_request); RMW_CONNEXT_LOG_DEBUG_A( @@ -2923,7 +3031,13 @@ RMW_Connext_Service::create( RMW_CONNEXT_MESSAGE_REQUEST, svc_members_req, svc_members_req_cpp, - &request_type); + &request_type, + nullptr /* cft_name */, + nullptr /* cft_filter */, + /* If we are using the extended RPC mapping, then we cache the reply writer + so that we can notify it of "subscription_match" events. */ + (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended ? + svc_impl->reply_pub : nullptr)); if (nullptr == svc_impl->request_sub) { RMW_CONNEXT_LOG_ERROR("failed to create service requester") @@ -2937,14 +3051,14 @@ RMW_Connext_Service::create( rmw_ret_t RMW_Connext_Service::enable() { - rmw_ret_t rc = this->reply_pub->enable(); + rmw_ret_t rc = this->request_sub->enable(); if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable service's publisher") + RMW_CONNEXT_LOG_ERROR("failed to enable service's subscription") return rc; } - rc = this->request_sub->enable(); + rc = this->reply_pub->enable(); if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable service's subscription") + RMW_CONNEXT_LOG_ERROR("failed to enable service's publisher") return rc; } return RMW_RET_OK; @@ -2984,6 +3098,11 @@ RMW_Connext_Service::take_request( request_header->source_timestamp = message_info.source_timestamp; request_header->received_timestamp = message_info.received_timestamp; + if (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended) { + /* Cache the writer/reader GUIDs */ + reply_pub->push_related_endpoints(rr_msg.gid, rr_msg.writer_gid); + } + *taken = true; RMW_CONNEXT_LOG_DEBUG_A( @@ -3035,6 +3154,59 @@ RMW_Connext_Service::send_response( reinterpret_cast(rr_msg.gid.data)[3], rr_msg.sn) + /* (asorbini) The following logic tries to partially work around some race conditions that exists + in the way request/reply interactions between clients and services are mapped to DDS topics + and endpoints. + + It is possible for the service's request DataReader to receive a request from the client, + handle it, and try to write the response before the service's reply DataWriter has actually + matched the client's reply DataReader. If this is the case, the response will be lost + (unless the endpoints have been configured with "transient local" or higher Durability QoS, + but this is not the default configuration, and it is unlikely to be used in practice...). + The data race and loss of reply message may also occur in the case of an + "asymmetric discovery" between the client's reply DataReader and the service's reply + DataWriter, where the latter has discovered and matched the former, but not viceversa. + + The only way to fully resolve these data races is to have the underlying implementation force + a "match ordering" when dealing with pairs of endpoints used for RPC exchanges. The solution + is described by the OMG DDS-RPC 1.0 specification, section 7.6.2 (Enhanced Service Mapping). + It requires the DDS implementation to prevent the DataWriter from matching its remote + counterpart before the associated DataReader has matched. This behavior must be enforced + by both service and client for all data races to be resolved. + + This partial workaround relies on (ab)using the "related sample identity" field to communicate + the client's reply DataReader's GUID to the service. Normally, this field should would be + used to communicate the client's request DataWriter's GUID. + + When the service detects the presence of an unusual DataReader GUID, it will enter a + (time-bounded) loop to query the reply DataWriter's list of matched subscriptions, and wait + for the GUID to appear. + + If the reader is not matched within the timeout (HistoryQosPolicy::max_blocking_time), the + service will return RMW_RET_TIMEOUT to the upper layers. Otherwise, the response is written + out as normal. + + It is still possible for the response to be lost by the client, in case of an asymmetric + discovery, but this issue partially mitigated by the implementation of + Client::is_service_available. + */ + if (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended) { + struct DDS_GUID_t src_guid = DDS_GUID_INITIALIZER; + rmw_ret_t rc = RMW_RET_ERROR; + rc = rmw_connextdds_gid_to_guid(rr_msg.gid, src_guid); + if (RMW_RET_OK != rc) { + return rc; + } + DDS_RTPS_GUID_t * const rtps_guid = DDS_GUID_as_rtps_guid(&src_guid); + if (rtps_guid->entityId.entityKind & 0x04) { + bool unmatched = false; + rc = reply_pub->wait_for_subscription(rr_msg.gid, unmatched, rr_msg.writer_gid); + if (RMW_RET_OK != rc || unmatched) { + return rc; + } + } + } + return this->reply_pub->write(&rr_msg, false /* serialized */, &write_params); } diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index d9a4759b..6f7f0405 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -614,7 +614,8 @@ RMW_Connext_StatusCondition::update_status_inconsistent_topic( rmw_ret_t RMW_Connext_SubscriberStatusCondition::install( - RMW_Connext_Subscriber * const sub) + RMW_Connext_Subscriber * const sub, + RMW_Connext_Publisher * const related_pub) { DDS_DataReaderListener listener = DDS_DataReaderListener_INITIALIZER; DDS_StatusMask listener_mask = DDS_STATUS_MASK_NONE; @@ -644,6 +645,9 @@ RMW_Connext_SubscriberStatusCondition::install( rmw_connextdds_configure_subscriber_condition_listener( this, &listener, &listener_mask); + this->sub = sub; + this->related_pub = related_pub; + if (DDS_RETCODE_OK != DDS_DataReader_set_listener(sub->reader(), &listener, listener_mask)) { @@ -662,8 +666,6 @@ RMW_Connext_SubscriberStatusCondition::install( return RMW_RET_ERROR; } - this->sub = sub; - return RMW_RET_OK; } @@ -883,6 +885,10 @@ RMW_Connext_SubscriberStatusCondition::update_status_matched( this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; + if (nullptr != this->sub && nullptr != this->related_pub) { + this->related_pub->on_related_subscription_matched(this->sub, status); + } + this->notify_new_event(RMW_EVENT_SUBSCRIPTION_MATCHED); } @@ -909,6 +915,8 @@ RMW_Connext_PublisherStatusCondition::install( DDS_LIVELINESS_LOST_STATUS | DDS_PUBLICATION_MATCHED_STATUS; + this->pub = pub; + if (DDS_RETCODE_OK != DDS_DataWriter_set_listener( pub->writer(), &listener, listener_mask)) @@ -934,8 +942,6 @@ RMW_Connext_PublisherStatusCondition::install( return RMW_RET_ERROR; } - this->pub = pub; - return RMW_RET_OK; } @@ -1075,5 +1081,9 @@ RMW_Connext_PublisherStatusCondition::update_status_matched( this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; + if (nullptr != this->pub) { + this->pub->on_publication_matched(status); + } + this->notify_new_event(RMW_EVENT_PUBLICATION_MATCHED); } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 42d6ad6b..d9fcd91d 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -30,6 +30,8 @@ #include "rmw_connextdds/rmw_impl.hpp" #include "rmw_connextdds/graph_cache.hpp" +#include "dds_c/dds_c_infrastructure_impl.h" + const char * const RMW_CONNEXTDDS_ID = "rmw_connextdds"; const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr"; @@ -750,22 +752,20 @@ rmw_connextdds_write_message( const RMW_Connext_RequestReplyMessage * const rr_msg = reinterpret_cast(message->user_data); - if (!rr_msg->request) { - /* If this is a reply, propagate the request's sample identity - via the related_sample_identity field */ - rmw_ret_t rc = RMW_RET_ERROR; - - rmw_connextdds_sn_ros_to_dds( - rr_msg->sn, - write_params.related_sample_identity.sequence_number); + // Propagate the request's sample identity via the related_sample_identity field + int64_t sn_ros = rr_msg->sn >= 0 ? rr_msg->sn : 0; + rmw_connextdds_sn_ros_to_dds( + sn_ros, + write_params.related_sample_identity.sequence_number); + + rmw_ret_t rc = rmw_connextdds_gid_to_guid( + rr_msg->request ? rr_msg->gid : rr_msg->writer_gid, + write_params.related_sample_identity.writer_guid); + if (RMW_RET_OK != rc) { + return rc; + } - rc = rmw_connextdds_gid_to_guid( - rr_msg->gid, - write_params.related_sample_identity.writer_guid); - if (RMW_RET_OK != rc) { - return rc; - } - } else { + if (rr_msg->request) { // enable WriteParams::replace_auto to retrieve SN of published message write_params.replace_auto = DDS_BOOLEAN_TRUE; } @@ -1599,3 +1599,12 @@ rmw_connextdds_get_cft_filter_expression( return RMW_RET_OK; } + +rmw_ret_t +rmw_connextdds_guid_to_instance_handle( + const struct DDS_GUID_t * const guid, + DDS_InstanceHandle_t * const instance_handle) +{ + DDS_GUID_to_instance_handle(guid, instance_handle); + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index d0372f9a..332a8fc5 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -2041,3 +2041,13 @@ rmw_connextdds_get_cft_filter_expression( UNUSED_ARG(options); return RMW_RET_UNSUPPORTED; } + +rmw_ret_t +rmw_connextdds_guid_to_instance_handle( + const struct DDS_GUID_t * const guid, + DDS_InstanceHandle_t * const instance_handle) +{ + UNUSED_ARG(guid); + UNUSED_ARG(instance_handle); + return RMW_RET_UNSUPPORTED; +}