diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp index db7944398..12b0576d5 100644 --- a/implementation/configuration/include/configuration.hpp +++ b/implementation/configuration/include/configuration.hpp @@ -187,7 +187,9 @@ class configuration { virtual uint8_t get_sd_repetitions_max() const = 0; virtual ttl_t get_sd_ttl() const = 0; virtual int32_t get_sd_cyclic_offer_delay() const = 0; - virtual int32_t get_sd_request_response_delay() const = 0; + virtual int32_t get_sd_cyclic_request_delay() const = 0; + virtual int32_t get_sd_request_response_delay_min() const = 0; + virtual int32_t get_sd_request_response_delay_max() const = 0; virtual uint8_t get_sd_find_initial_debounce_reps() const = 0; virtual std::uint32_t get_sd_find_initial_debounce_time() const = 0; virtual std::uint32_t get_sd_offer_debounce_time() const = 0; diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp index 51893f142..422fc0c83 100644 --- a/implementation/configuration/include/configuration_impl.hpp +++ b/implementation/configuration/include/configuration_impl.hpp @@ -198,7 +198,9 @@ class configuration_impl: VSOMEIP_EXPORT uint8_t get_sd_repetitions_max() const; VSOMEIP_EXPORT ttl_t get_sd_ttl() const; VSOMEIP_EXPORT int32_t get_sd_cyclic_offer_delay() const; - VSOMEIP_EXPORT int32_t get_sd_request_response_delay() const; + VSOMEIP_EXPORT int32_t get_sd_cyclic_request_delay() const; + VSOMEIP_EXPORT int32_t get_sd_request_response_delay_min() const; + VSOMEIP_EXPORT int32_t get_sd_request_response_delay_max() const; VSOMEIP_EXPORT uint8_t get_sd_find_initial_debounce_reps() const; VSOMEIP_EXPORT std::uint32_t get_sd_find_initial_debounce_time() const; VSOMEIP_EXPORT std::uint32_t get_sd_offer_debounce_time() const; @@ -552,7 +554,9 @@ class configuration_impl: uint8_t sd_repetitions_max_; ttl_t sd_ttl_; int32_t sd_cyclic_offer_delay_; - int32_t sd_request_response_delay_; + int32_t sd_cyclic_request_delay_; + int32_t sd_request_response_delay_min_; + int32_t sd_request_response_delay_max_; std::uint32_t sd_offer_debounce_time_; std::uint32_t sd_find_debounce_time_; uint8_t sd_find_initial_debounce_reps_; @@ -601,7 +605,9 @@ class configuration_impl: ET_SERVICE_DISCOVERY_REPETITION_MAX, ET_SERVICE_DISCOVERY_TTL, ET_SERVICE_DISCOVERY_CYCLIC_OFFER_DELAY, - ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY, + ET_SERVICE_DISCOVERY_CYCLIC_REQUEST_DELAY, + ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MIN, + ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MAX, ET_WATCHDOG_ENABLE, ET_WATCHDOG_TIMEOUT, ET_WATCHDOG_ALLOWED_MISSING_PONGS, diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 4e7e30807..db39c7720 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -57,7 +57,9 @@ configuration_impl::configuration_impl(const std::string& _path) : sd_repetitions_base_delay_ {VSOMEIP_SD_DEFAULT_REPETITIONS_BASE_DELAY}, sd_repetitions_max_ {VSOMEIP_SD_DEFAULT_REPETITIONS_MAX}, sd_ttl_ {VSOMEIP_SD_DEFAULT_TTL}, sd_cyclic_offer_delay_ {VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY}, - sd_request_response_delay_ {VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY}, + sd_cyclic_request_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_REQUEST_DELAY), + sd_request_response_delay_min_(VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MIN), + sd_request_response_delay_max_(VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MAX), sd_offer_debounce_time_ {VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME}, sd_find_debounce_time_ {VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME}, sd_find_initial_debounce_reps_(VSOMEIP_SD_INITIAL_FIND_DEBOUNCE_REPS), @@ -164,7 +166,9 @@ configuration_impl::configuration_impl(const configuration_impl& _other) : sd_repetitions_max_ = _other.sd_repetitions_max_; sd_ttl_ = _other.sd_ttl_; sd_cyclic_offer_delay_= _other.sd_cyclic_offer_delay_; - sd_request_response_delay_= _other.sd_request_response_delay_; + sd_cyclic_request_delay_= _other.sd_cyclic_request_delay_; + sd_request_response_delay_min_= _other.sd_request_response_delay_min_; + sd_request_response_delay_max_= _other.sd_request_response_delay_max_; sd_find_initial_debounce_reps_ = _other.sd_find_initial_debounce_reps_; sd_find_initial_debounce_time_ = _other.sd_find_initial_debounce_time_; sd_offer_debounce_time_ = _other.sd_offer_debounce_time_; @@ -1814,14 +1818,23 @@ void configuration_impl::load_service_discovery( its_converter >> sd_cyclic_offer_delay_; is_configured_[ET_SERVICE_DISCOVERY_CYCLIC_OFFER_DELAY] = true; } - } else if (its_key == "request_response_delay") { - if (is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY]) { - VSOMEIP_WARNING << "Multiple definitions for service_discovery.request_response_delay." + } else if (its_key == "request_response_delay_min") { + if (is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MIN]) { + VSOMEIP_WARNING << "Multiple definitions for service_discovery.request_response_delay_min." " Ignoring definition from " << _element.name_; } else { its_converter << its_value; - its_converter >> sd_request_response_delay_; - is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY] = true; + its_converter >> sd_request_response_delay_min_; + is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MIN] = true; + } + } else if (its_key == "request_response_delay_max") { + if (is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MAX]) { + VSOMEIP_WARNING << "Multiple definitions for service_discovery.request_response_delay_max." + " Ignoring definition from " << _element.name_; + } else { + its_converter << its_value; + its_converter >> sd_request_response_delay_max_; + is_configured_[ET_SERVICE_DISCOVERY_REQUEST_RESPONSE_DELAY_MAX] = true; } } else if (its_key == "find_initial_debounce_reps") { if (is_configured_[ET_SERVICE_DISCOVERY_FIND_INITIAL_DEBOUNCE_REPS]) { @@ -1959,7 +1972,7 @@ void configuration_impl::load_delays( its_converter >> sd_cyclic_offer_delay_; } else if (its_key == "cyclic-request") { its_converter << std::dec << i->second.data(); - its_converter >> sd_request_response_delay_; + its_converter >> sd_cyclic_request_delay_; } else if (its_key == "ttl") { its_converter << std::dec << i->second.data(); its_converter >> sd_ttl_; @@ -3829,8 +3842,16 @@ int32_t configuration_impl::get_sd_cyclic_offer_delay() const { return sd_cyclic_offer_delay_; } -int32_t configuration_impl::get_sd_request_response_delay() const { - return sd_request_response_delay_; +int32_t configuration_impl::get_sd_cyclic_request_delay() const { + return sd_cyclic_request_delay_; +} + +int32_t configuration_impl::get_sd_request_response_delay_min() const { + return sd_request_response_delay_min_; +} + +int32_t configuration_impl::get_sd_request_response_delay_max() const { + return sd_request_response_delay_max_; } uint8_t configuration_impl::get_sd_find_initial_debounce_reps() const { diff --git a/implementation/service_discovery/include/async_sender.hpp b/implementation/service_discovery/include/async_sender.hpp new file mode 100644 index 000000000..6f32b4752 --- /dev/null +++ b/implementation/service_discovery/include/async_sender.hpp @@ -0,0 +1,88 @@ +// Copyright (C) 2024-2025 Valeo India Private Limited +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// +// /file async_sender.hpp +// +// /brief This file implements declares a class which +// implements a queue, and functions for timeout +// for the messages. On timeout, the client of +// this class will receive a callback. +// + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "message_impl.hpp" + +namespace async +{ + namespace sender + { + using time_point = std::chrono::steady_clock::time_point; + + struct async_packet_data + { + std::vector> messages_; + boost::asio::ip::address address_; + time_point timeout_; + }; + + class async_packet_send_callback + { + public: + virtual ~async_packet_send_callback() = default; + virtual void on_async_send_pkt(std::shared_ptr) = 0; + }; + + class time_point_comparator + { + public: + bool operator()(std::shared_ptr a, std::shared_ptr b) + { + return a->timeout_ > b->timeout_; + } + }; + + using priority_queue = std::priority_queue< + std::shared_ptr, + std::vector>, + time_point_comparator>; + + class async_sender + { + public: + async_sender() = default; + ~async_sender(); + void add_queue(std::shared_ptr _pkt); + void set_packet_sender_callback(std::shared_ptr callback); + void start(); + void stop(); + inline bool check_if_running() { return is_running_.load(); }; + + private: + void worker(); + void handle_packet(std::shared_ptr _pkt); + void iterate_over_queue(); + + private: + std::atomic is_running_; + std::thread thread_; + std::condition_variable cv_; + std::mutex mtx_; + priority_queue queue_; + std::shared_ptr callback_; + uint64_t minWait_; + }; + } +} diff --git a/implementation/service_discovery/include/defines.hpp b/implementation/service_discovery/include/defines.hpp index 44aedf86b..78fd2789f 100644 --- a/implementation/service_discovery/include/defines.hpp +++ b/implementation/service_discovery/include/defines.hpp @@ -44,7 +44,9 @@ #define VSOMEIP_SD_DEFAULT_REPETITIONS_MAX 3 #define VSOMEIP_SD_DEFAULT_TTL DEFAULT_TTL #define VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY 1000 -#define VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY 2000 +#define VSOMEIP_SD_DEFAULT_CYCLIC_REQUEST_DELAY 2000 +#define VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MIN 10 +#define VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MAX 30 #define VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME 500 #define VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME 500 #define VSOMEIP_SD_INITIAL_FIND_DEBOUNCE_TIME 200 diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index 34a33f277..55c630058 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -7,6 +7,7 @@ #define VSOMEIP_V3_SD_SERVICE_DISCOVERY_HPP_ #include +#include #include #include @@ -76,6 +77,7 @@ class service_discovery { virtual void register_reboot_notification_handler( const reboot_notification_handler_t &_handler) = 0; virtual std::recursive_mutex& get_subscribed_mutex() = 0; + virtual std::chrono::milliseconds get_request_response_delay_random() const = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 8d8f89275..0669f405c 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -27,6 +27,7 @@ #include "ipv6_option_impl.hpp" #include "deserializer.hpp" #include "message_impl.hpp" +#include "async_sender.hpp" namespace vsomeip_v3 { @@ -35,6 +36,11 @@ class serializer; namespace sd { +using time_point_clock = std::chrono::steady_clock; +using time_point = time_point_clock::time_point; +using service_instance_timepoint = std::map>; +using service_instance_entry_type = std::map>>>; + class entry_impl; class eventgroupentry_impl; class option_impl; @@ -56,8 +62,10 @@ struct entry_data_t { std::shared_ptr other_; }; -class service_discovery_impl: public service_discovery, - public std::enable_shared_from_this { +class service_discovery_impl : + public service_discovery, + public async::sender::async_packet_send_callback, + public std::enable_shared_from_this { public: service_discovery_impl(service_discovery_host *_host, const std::shared_ptr& _configuration); @@ -111,6 +119,10 @@ class service_discovery_impl: public service_discovery, void register_sd_acceptance_handler(const sd_acceptance_handler_t &_handler); void register_reboot_notification_handler( const reboot_notification_handler_t &_handler); + + void on_async_send_pkt(std::shared_ptr) override; + + std::chrono::milliseconds get_request_response_delay_random() const; private: std::pair get_session(const boost::asio::ip::address &_address); void increment_session(const boost::asio::ip::address &_address); @@ -176,7 +188,8 @@ class service_discovery_impl: public service_discovery, instance_t _instance, major_version_t _major, minor_version_t _minor, - bool _unicast_flag); + bool _unicast_flag, + bool _received_via_multicast); void process_eventgroupentry( std::shared_ptr &_entry, const std::vector > &_options, @@ -272,10 +285,11 @@ class service_discovery_impl: public service_discovery, void send_uni_or_multicast_offerservice( const std::shared_ptr &_info, - bool _unicast_flag); + bool _unicast_flag, + bool _received_via_multicast); bool last_offer_shorter_half_offer_delay_ago(); void send_unicast_offer_service( - const std::shared_ptr &_info); + const std::shared_ptr &_info, bool _received_via_multicast); void send_multicast_offer_service( const std::shared_ptr& _info); @@ -372,6 +386,15 @@ class service_discovery_impl: public service_discovery, reliability_type_e get_eventgroup_reliability( service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr& _subscription); + + void prepare_async_send(time_point _ts, + std::vector> &_messages, + const boost::asio::ip::address &_address); + time_point get_time_point_for_subs_offer(bool _is_subscribe, service_t _service, instance_t _instance); + void set_async_msg_pending(std::vector>& _msgs); + void reset_async_msg_pending(std::vector>& _msgs); + bool check_if_async_msg_pending(service_t _s, instance_t _i, entry_type_e _e); + void deserialize_data(const byte_t* _data, const length_t& _size, std::shared_ptr& _message); @@ -515,6 +538,12 @@ class service_discovery_impl: public service_discovery, std::mutex offer_mutex_; std::mutex check_ttl_mutex_; + service_instance_timepoint last_offer_ts_; + service_instance_timepoint last_find_ts_; + async::sender::async_sender async_sender_; + service_instance_entry_type service_instance_entry_type_; + int32_t request_response_delay_min_; + int32_t request_response_delay_max_; }; } // namespace sd diff --git a/implementation/service_discovery/src/async_sender.cpp b/implementation/service_discovery/src/async_sender.cpp new file mode 100644 index 000000000..9a252575a --- /dev/null +++ b/implementation/service_discovery/src/async_sender.cpp @@ -0,0 +1,132 @@ +// Copyright (C) 2024-2025 Valeo India Private Limited +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// +// /file async_sender.cpp +// +// /brief This file defines a class implements a queue, +// and functions for timeout for the messages. +// On timeout, the client of this class will +// receive a callback. +// + +#include "../include/async_sender.hpp" +#include +#include +#include +#include +#include +#include + +#include "../../../interface/vsomeip/internal/logger.hpp" + +namespace async +{ + namespace sender + { + async_sender::~async_sender() + { + stop(); + } + + void async_sender::start() + { + if (is_running_.load()) + { + VSOMEIP_TRACE << "async::sender thread already running, stopping previous thread.."; + stop(); + } + is_running_.store(true); + minWait_ = 0; + thread_ = std::thread(&async_sender::worker, this); + } + + void async_sender::stop() + { + is_running_.store(false); + cv_.notify_all(); // Notify the worker thread to stop + if (thread_.joinable()) + { + thread_.join(); + } + queue_ = {}; + } + + void async_sender::add_queue(std::shared_ptr _pkt) + { + VSOMEIP_TRACE << "async::sender add element"; + { + std::lock_guard lock(mtx_); + queue_.push(std::move(_pkt)); + } + cv_.notify_one(); + } + + void async_sender::set_packet_sender_callback(std::shared_ptr _callback) + { + callback_ = _callback; + } + + void async_sender::worker() + { + VSOMEIP_TRACE << "async::sender thread started"; + while (is_running_.load()) + { + std::unique_lock lock(mtx_); + if (minWait_) + { + cv_.wait_for(lock, std::chrono::microseconds(minWait_), [this] + { return !is_running_.load(); }); + } + else + { + cv_.wait(lock, [this] + { return !queue_.empty() || !is_running_.load(); }); + } + if (!is_running_.load()) + { + break; // Exit the loop if the sender is stopped + } + minWait_ = 0; + iterate_over_queue(); + } + VSOMEIP_TRACE << "async::sender thread exited"; + } + + void async_sender::handle_packet(std::shared_ptr _pkt) + { + if (callback_) + { + callback_->on_async_send_pkt(_pkt); + } + else + { + VSOMEIP_ERROR << "async::sender Callback not set "; + } + } + + void async_sender::iterate_over_queue() + { + time_point now = std::chrono::steady_clock::now(); + while (!queue_.empty()) + { + auto delay = queue_.top()->timeout_; + if (now >= delay) + { + VSOMEIP_TRACE << "async::sender thread pkt timeout"; + handle_packet( queue_.top()); + queue_.pop(); + } + else + { + std::chrono::duration duration = std::chrono::duration_cast(delay - now); + minWait_ = duration.count(); + VSOMEIP_TRACE << "async::sender thread will wait for "<get_ttl_factor_subscribes(); last_msg_received_timer_timeout_ = cyclic_offer_delay_ + (cyclic_offer_delay_ / 10); + request_response_delay_min_ = configuration_->get_sd_request_response_delay_min(); + request_response_delay_max_ = configuration_->get_sd_request_response_delay_max(); + if (request_response_delay_min_ > request_response_delay_max_) { + VSOMEIP_ERROR << "Configuration error: " + << "request_response_delay_min (" << request_response_delay_min_<< ")" + <<" > request_response_delay_max (" << request_response_delay_max_ << ")" + <<". Using default values."; + request_response_delay_min_ = VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MIN; + request_response_delay_max_ = VSOMEIP_SD_DEFAULT_REQUEST_RESPONSE_DELAY_MAX; + } } void @@ -210,6 +220,8 @@ service_discovery_impl::start() { start_offer_debounce_timer(true); start_find_debounce_timer(true); start_ttl_timer(); + async_sender_.set_packet_sender_callback(shared_from_this()); + async_sender_.start(); start_last_msg_received_timer(); } @@ -222,6 +234,7 @@ service_discovery_impl::stop() { stop_find_debounce_timer(); stop_offer_debounce_timer(); stop_main_phase_timer(); + async_sender_.stop(); } void @@ -403,7 +416,8 @@ service_discovery_impl::send_subscription( << std::setw(4) << _eventgroup << "] "; } - if (its_data.entry_) { + if (its_data.entry_ && + !check_if_async_msg_pending(_service, _instance, its_data.entry_->get_type())) { // TODO: Implement a simple path, that sends a single message auto its_current_message = std::make_shared(); std::vector > its_messages; @@ -411,7 +425,7 @@ service_discovery_impl::send_subscription( add_entry_data(its_messages, its_data); - serialize_and_send(its_messages, its_address); + prepare_async_send(get_time_point_for_subs_offer(true,_service,_instance),its_messages,its_address); } } } @@ -1316,14 +1330,43 @@ service_discovery_impl::on_message( // check resubscriptions for validity for (auto iter = its_resubscribes.begin(); iter != its_resubscribes.end();) { - if ((*iter)->get_entries().empty() || (*iter)->get_options().empty()) { + bool shouldErase = false; + if ((*iter)->get_entries().empty() || (*iter)->get_options().empty()){ + shouldErase = true; + } + else { + std::vector> entris = (*iter)->get_entries(); + for (auto entry : entris) + { + // if subscription request already added in async sender queue + // we do not want to add RE-subscrition request to async sender queue + // seen in case repetitions_base_delay is less than request resonse delay + if (check_if_async_msg_pending( + entry->get_service(), + entry->get_instance(), + entry->get_type())) { + VSOMEIP_WARNING << "sd::" << __func__ << ": Async send pendng for subscribe. Ignoring this [" + << std::hex << std::setfill('0') + << std::setw(4) << entry->get_service() << "." + << std::setw(4) << entry->get_instance() << "] " + << std::setw(4) << "entry_type_e::SUBSCRIBE_EVENTGROUP"; + shouldErase = true; + break; + } + } + } + if (shouldErase) { iter = its_resubscribes.erase(iter); } else { iter++; } } if (!its_resubscribes.empty()) { - serialize_and_send(its_resubscribes, _sender); + if(_is_multicast){ + prepare_async_send(time_point_clock::now(),its_resubscribes,_sender); + } else{ + serialize_and_send(its_resubscribes, _sender); + } } } else { VSOMEIP_ERROR << "service_discovery_impl::" << __func__ << ": Deserialization error."; @@ -1455,7 +1498,7 @@ void service_discovery_impl::process_serviceentry( switch (its_type) { case entry_type_e::FIND_SERVICE: process_findservice_serviceentry(its_service, its_instance, its_major, its_minor, - _unicast_flag); + _unicast_flag, _received_via_multicast); break; case entry_type_e::OFFER_SERVICE: process_offerservice_serviceentry(its_service, its_instance, its_major, its_minor, @@ -1620,6 +1663,7 @@ void service_discovery_impl::process_offerservice_serviceentry( // No need to resubscribe for unicast offers if (_received_via_multicast) { + last_offer_ts_[_service][_instance] = time_point_clock::now(); auto found_service = subscribed_.find(_service); if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); @@ -1665,6 +1709,9 @@ void service_discovery_impl::process_offerservice_serviceentry( } } } + } else{ + // offer received via unicast, assign zero + last_offer_ts_[_service][_instance] = time_point(); } host_->add_routing_info(_service, _instance, _major, _minor, @@ -1673,12 +1720,12 @@ void service_discovery_impl::process_offerservice_serviceentry( _unreliable_port); } -void -service_discovery_impl::process_findservice_serviceentry( - service_t _service, instance_t _instance, - major_version_t _major, minor_version_t _minor, - bool _unicast_flag) { +void service_discovery_impl::process_findservice_serviceentry( + service_t _service, instance_t _instance, + major_version_t _major, minor_version_t _minor, + bool _unicast_flag, bool _received_via_multicast) { + last_find_ts_[_service][_instance] = time_point_clock::now(); if (_instance != ANY_INSTANCE) { std::shared_ptr its_info = host_->get_offered_service( _service, _instance); @@ -1686,7 +1733,7 @@ service_discovery_impl::process_findservice_serviceentry( if (_major == ANY_MAJOR || _major == its_info->get_major()) { if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) { if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) { - send_uni_or_multicast_offerservice(its_info, _unicast_flag); + send_uni_or_multicast_offerservice(its_info, _unicast_flag,_received_via_multicast); } } } @@ -1700,7 +1747,7 @@ service_discovery_impl::process_findservice_serviceentry( if (_major == ANY_MAJOR || _major == its_info->get_major()) { if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) { if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) { - send_uni_or_multicast_offerservice(its_info, _unicast_flag); + send_uni_or_multicast_offerservice(its_info, _unicast_flag,_received_via_multicast); } } } @@ -1708,9 +1755,9 @@ service_discovery_impl::process_findservice_serviceentry( } } -void -service_discovery_impl::send_unicast_offer_service( - const std::shared_ptr &_info) { +void service_discovery_impl::send_unicast_offer_service( + const std::shared_ptr &_info, + bool _received_via_multicast) { std::shared_ptr its_runtime = runtime_.lock(); if (!its_runtime) { return; @@ -1722,21 +1769,43 @@ service_discovery_impl::send_unicast_offer_service( insert_offer_service(its_messages, _info); - serialize_and_send(its_messages, current_remote_address_); + if (_received_via_multicast) { + service_t _service = _info->get_service(); + instance_t _instance = _info->get_instance(); + if (check_if_async_msg_pending(_service, _instance, entry_type_e::OFFER_SERVICE)) { + VSOMEIP_WARNING << "sd::" << __func__ << ": Async send pendng for offer. Ignoring this [" + << std::hex << std::setfill('0') + << std::setw(4) << _service << "." + << std::setw(4) << _instance << "] " + << std::setw(4) << "entry_type_e::OFFER_SERVICE"; + return; + } + prepare_async_send(get_time_point_for_subs_offer(false,_service,_instance),its_messages,current_remote_address_); + } else{ + serialize_and_send(its_messages,current_remote_address_); + } } void service_discovery_impl::send_multicast_offer_service( const std::shared_ptr &_info) { + service_t _service = _info->get_service(); + instance_t _instance = _info->get_instance(); + if (check_if_async_msg_pending(_service, _instance, entry_type_e::OFFER_SERVICE)) { + VSOMEIP_WARNING << "sd::" << __func__ << ": Async send pendng for offer. Ignoring this [" + << std::hex << std::setfill('0') + << std::setw(4) << _service << "." + << std::setw(4) << _instance << "] " + << std::setw(4) << "entry_type_e::OFFER_SERVICE"; + return; + } auto its_offer_message(std::make_shared()); std::vector > its_messages; its_messages.push_back(its_offer_message); insert_offer_service(its_messages, _info); - // send message as multicast offer service the same way it is sent - // on the repetition phase to preserve the session id - send(its_messages); + prepare_async_send(get_time_point_for_subs_offer(false,_service,_instance),its_messages,sd_multicast_address_); } void @@ -1828,7 +1897,7 @@ service_discovery_impl::on_endpoint_connected( } } - serialize_and_send(its_messages, its_address); + prepare_async_send(get_time_point_for_subs_offer(true,_service,_instance),its_messages,its_address); } std::shared_ptr @@ -3334,12 +3403,12 @@ service_discovery_impl::on_main_phase_timer_expired( start_main_phase_timer(); } -void -service_discovery_impl::send_uni_or_multicast_offerservice( - const std::shared_ptr &_info, bool _unicast_flag) { +void service_discovery_impl::send_uni_or_multicast_offerservice( + const std::shared_ptr &_info, bool _unicast_flag, + bool _received_via_multicast) { if (_unicast_flag) { // SID_SD_826 if (last_offer_shorter_half_offer_delay_ago()) { // SIP_SD_89 - send_unicast_offer_service(_info); + send_unicast_offer_service(_info,_received_via_multicast); } else { // SIP_SD_90 send_multicast_offer_service(_info); } @@ -4031,6 +4100,114 @@ reliability_type_e service_discovery_impl::get_eventgroup_reliability( return its_reliability; } +void service_discovery_impl::prepare_async_send(time_point _ts, + std::vector> &_messages, + const boost::asio::ip::address &_address) +{ + if (0 == _ts.time_since_epoch().count()) { + VSOMEIP_INFO << "sd::" << __func__ << ": ts not set for async, sending packet now."; + serialize_and_send(_messages, _address); + return; + } + time_point ts = _ts + get_request_response_delay_random(); + std::shared_ptr _pkt = std::make_shared(); + _pkt->messages_ =_messages; + _pkt->address_ = _address; + _pkt->timeout_ = ts; + set_async_msg_pending(_messages); + async_sender_.add_queue(std::move(_pkt)); +} + +time_point service_discovery_impl::get_time_point_for_subs_offer(bool _is_subscribe, service_t _service, instance_t _instance){ + if(_is_subscribe){ + auto serviceMap = last_offer_ts_.find(_service); + if(serviceMap != last_offer_ts_.end()){ + auto instanceMap = serviceMap->second.find(_instance); + if(instanceMap != serviceMap->second.end()){ + return instanceMap->second; + } + } + } else{ + auto serviceMap = last_find_ts_.find(_service); + if(serviceMap != last_find_ts_.end()){ + auto instanceMap = serviceMap->second.find(_instance); + if(instanceMap != serviceMap->second.end()){ + return instanceMap->second; + } + } + } + return time_point(); +} + +void service_discovery_impl::on_async_send_pkt(std::shared_ptr _pkt) { + reset_async_msg_pending(_pkt->messages_); + serialize_and_send(_pkt->messages_,_pkt->address_); +} + +void service_discovery_impl::reset_async_msg_pending(std::vector>& _msgs){ + std::vector> entris; + for(auto msg : _msgs){ + entris = msg->get_entries(); + for(auto entry : entris){ + service_instance_entry_type_[entry->get_service()][entry->get_instance()][entry->get_type()] = false; + } + } +} + +void service_discovery_impl::set_async_msg_pending(std::vector>& _msgs){ + std::vector> entris; + for(auto msg : _msgs){ + entris = msg->get_entries(); + for(auto entry : entris){ + service_instance_entry_type_[entry->get_service()][entry->get_instance()][entry->get_type()] = true; + } + } +} + +bool service_discovery_impl::check_if_async_msg_pending(service_t _s, instance_t _i, entry_type_e _e) { + bool ret = false; + auto hasService = service_instance_entry_type_.find(_s); + if(hasService != service_instance_entry_type_.end()){ + auto hasInstance = hasService->second.find(_i); + if(hasInstance != hasService->second.end()){ + auto hasType = hasInstance->second.find(_e); + if(hasType != hasInstance->second.end()){ + ret = hasType->second; + } + } + } + return ret; +} + +std::chrono::milliseconds service_discovery_impl::get_request_response_delay_random() const { + try { + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution distribution( + request_response_delay_min_, request_response_delay_max_); + return std::chrono::milliseconds(distribution(gen)); + } + catch (const std::exception &e) { + VSOMEIP_ERROR << "Failed to generate random request response delay" << e.what(); + + // Fallback to the Mersenne Twister engine + const auto seed = static_cast( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + + static std::mt19937 mtwister{seed}; + + // Interpolate between request_response_delay bounds + return std::chrono::milliseconds( + (request_response_delay_min_ + + (static_cast(mtwister()) * + static_cast(request_response_delay_max_ - request_response_delay_min_) / + static_cast(std::mt19937::max() - + std::mt19937::min())))); + } +} + void service_discovery_impl::deserialize_data(const byte_t* _data, const length_t& _size, std::shared_ptr& _message) { std::lock_guard its_lock(deserialize_mutex_);