diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index 8c78e78a..6f531a61 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -90,8 +90,8 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_event_set_callback( rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Info API @@ -435,15 +435,15 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Subscription API @@ -560,9 +560,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription( RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data); + rmw_subscription_t * const rmw_subscription, + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * WaitSet API diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 09c911e4..02fa9b94 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -333,8 +333,44 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition virtual bool has_status(const rmw_event_type_t event_type) = 0; + void + notify_new_event() + { + std::unique_lock lock_mutex(new_event_mutex_); + if (new_event_cb_) { + new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } + } + + void + set_new_event_callback( + rmw_event_callback_t callback, + const void * user_data) + { + std::unique_lock lock_mutex(new_event_mutex_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (unread_events_count_) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; + } + user_data_ = user_data; + new_event_cb_ = callback; + } else { + user_data_ = nullptr; + new_event_cb_ = nullptr; + } + } + protected: DDS_StatusCondition * scond; + std::mutex new_event_mutex_; + rmw_event_callback_t new_event_cb_{nullptr}; + const void * user_data_{nullptr}; + uint64_t unread_events_count_ = 0; }; void @@ -589,6 +625,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition update_state( [this, available]() { this->triggered_data = available; + if (available) { + this->notify_new_data(); + } }, true /* notify */); if (nullptr != this->loan_guard_condition) { @@ -712,6 +751,34 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition return RMW_RET_OK; } + void set_on_new_data_callback( + const rmw_event_callback_t callback, + const void * const user_data) + { + std::unique_lock lock(new_data_event_mutex_); + if (callback) { + if (unread_data_events_count_ > 0) { + callback(user_data, unread_data_events_count_); + unread_data_events_count_ = 0; + new_data_event_cb_ = callback; + data_event_user_data_ = user_data; + } + } else { + new_data_event_cb_ = nullptr; + data_event_user_data_ = nullptr; + } + } + + void notify_new_data() + { + std::unique_lock lock_mutex(new_data_event_mutex_); + if (new_data_event_cb_) { + new_data_event_cb_(data_event_user_data_, 1); + } else { + unread_data_events_count_++; + } + } + protected: void update_status_deadline( const DDS_RequestedDeadlineMissedStatus * const status); @@ -745,6 +812,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition RMW_Connext_Subscriber * sub; + std::mutex new_data_event_mutex_; + rmw_event_callback_t new_data_event_cb_{nullptr}; + const void * data_event_user_data_{nullptr}; + uint64_t unread_data_events_count_ = 0; + friend class RMW_Connext_WaitSet; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index c8e10121..0bbb5155 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -706,6 +706,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(); } void @@ -720,6 +722,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count; this->status_liveliness.not_alive_count_change -= this->status_liveliness_last.not_alive_count; + + this->notify_new_event(); } void @@ -852,6 +856,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(); } void @@ -863,6 +869,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( this->status_liveliness.total_count_change = this->status_liveliness.total_count; this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; + + this->notify_new_event(); } void diff --git a/rmw_connextdds_common/src/common/rmw_listener.cpp b/rmw_connextdds_common/src/common/rmw_listener.cpp index 730eaa92..d0a7958a 100644 --- a/rmw_connextdds_common/src/common/rmw_listener.cpp +++ b/rmw_connextdds_common/src/common/rmw_listener.cpp @@ -19,15 +19,25 @@ ******************************************************************************/ rmw_ret_t rmw_api_connextdds_event_set_callback( - rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data) + rmw_event_t * const event, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(event); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_event_set_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(event, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + event, + event->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_StatusCondition * condition = nullptr; + if (RMW_Connext_Event::reader_event(event)) { + condition = RMW_Connext_Event::subscriber(event)->condition(); + } else { + condition = RMW_Connext_Event::publisher(event)->condition(); + } + condition->set_new_event_callback(callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -35,28 +45,40 @@ rmw_api_connextdds_event_set_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( - rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data) + rmw_service_t * const service, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_service); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_service_set_on_new_request_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Service * const svc_impl = + reinterpret_cast(service->data); + svc_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( - rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data) + rmw_client_t * const client, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_client); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_client_set_on_new_response_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Client * const client_impl = + reinterpret_cast(client->data); + client_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -64,13 +86,19 @@ rmw_api_connextdds_client_set_on_new_response_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data) + rmw_subscription_t * const subscription, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_subscription); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_subscription_set_on_new_message_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + sub_impl->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; }