From dfe3ec00edb762c123658c361887f5af79fe3baf Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Wed, 28 Feb 2024 10:25:23 +0100 Subject: [PATCH 1/4] Refs #20256: Reject old samples Signed-off-by: Jesus Perez --- src/cpp/database/database.cpp | 223 ++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/src/cpp/database/database.cpp b/src/cpp/database/database.cpp index 9d5663db..d2f47886 100644 --- a/src/cpp/database/database.cpp +++ b/src/cpp/database/database.cpp @@ -943,6 +943,13 @@ void Database::insert_nts( if (writer != domain_writers->second.end()) { const HistoryLatencySample& fastdds_latency = dynamic_cast(sample); + + // Reject samples with old timestamps + if (fastdds_latency.src_ts <= writer->second->data.history2history_latency[fastdds_latency.reader].back().src_ts) + { + break; + } + writer->second->data.history2history_latency[fastdds_latency.reader].push_back(fastdds_latency); break; } @@ -961,6 +968,13 @@ void Database::insert_nts( if (participant != domain_participants->second.end()) { const NetworkLatencySample& network_latency = dynamic_cast(sample); + + // Reject samples with old timestamps + if (network_latency.src_ts <= participant->second->data.network_latency_per_locator[network_latency.remote_locator].back().src_ts) + { + break; + } + participant->second->data.network_latency_per_locator[network_latency.remote_locator].push_back( network_latency); break; @@ -981,6 +995,13 @@ void Database::insert_nts( { const PublicationThroughputSample& publication_throughput = dynamic_cast(sample); + + // Reject samples with old timestamps + if (publication_throughput.src_ts <= writer->second->data.publication_throughput.back().src_ts) + { + break; + } + writer->second->data.publication_throughput.push_back(publication_throughput); break; } @@ -1000,6 +1021,13 @@ void Database::insert_nts( { const SubscriptionThroughputSample& subscription_throughput = dynamic_cast(sample); + + // Reject samples with old timestamps + if (subscription_throughput.src_ts <= reader->second->data.subscription_throughput.back().src_ts) + { + break; + } + reader->second->data.subscription_throughput.push_back(subscription_throughput); break; } @@ -1019,6 +1047,13 @@ void Database::insert_nts( { const RtpsPacketsSentSample& rtps_packets_sent = dynamic_cast(sample); + // Reject samples with old timestamps + if (rtps_packets_sent.src_ts <= participant->second->data.last_reported_rtps_packets_sent_count[ + rtps_packets_sent.remote_locator].src_ts) + { + break; + } + // Create remote_locator if it does not exist get_locator_nts(rtps_packets_sent.remote_locator); @@ -1070,6 +1105,13 @@ void Database::insert_nts( { const RtpsBytesSentSample& rtps_bytes_sent = dynamic_cast(sample); + // Reject samples with old timestamps + if (rtps_bytes_sent.src_ts <= participant->second->data.last_reported_rtps_bytes_sent_count[ + rtps_bytes_sent.remote_locator].src_ts) + { + break; + } + // Create remote_locator if it does not exist get_locator_nts(rtps_bytes_sent.remote_locator); @@ -1119,6 +1161,13 @@ void Database::insert_nts( { const RtpsPacketsLostSample& rtps_packets_lost = dynamic_cast(sample); + // Reject samples with old timestamps + if (rtps_packets_lost.src_ts <= participant->second->data.last_reported_rtps_packets_lost_count[ + rtps_packets_lost.remote_locator].src_ts) + { + break; + } + // Create remote_locator if it does not exist get_locator_nts(rtps_packets_lost.remote_locator); @@ -1170,6 +1219,13 @@ void Database::insert_nts( { const RtpsBytesLostSample& rtps_bytes_lost = dynamic_cast(sample); + // Reject samples with old timestamps + if (rtps_bytes_lost.src_ts <= participant->second->data.last_reported_rtps_bytes_lost_count[ + rtps_bytes_lost.remote_locator].src_ts) + { + break; + } + // Create remote_locator if it does not exist get_locator_nts(rtps_bytes_lost.remote_locator); @@ -1219,6 +1275,12 @@ void Database::insert_nts( { const ResentDataSample& resent_datas = dynamic_cast(sample); + // Reject samples with old timestamps + if (resent_datas.src_ts <= writer->second->data.last_reported_resent_datas.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1260,6 +1322,12 @@ void Database::insert_nts( { const HeartbeatCountSample& heartbeat_count = dynamic_cast(sample); + // Reject samples with old timestamps + if (heartbeat_count.src_ts <= writer->second->data.last_reported_heartbeat_count.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1301,6 +1369,12 @@ void Database::insert_nts( { const AcknackCountSample& acknack_count = dynamic_cast(sample); + // Reject samples with old timestamps + if (acknack_count.src_ts <= reader->second->data.last_reported_acknack_count.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1342,6 +1416,12 @@ void Database::insert_nts( { const NackfragCountSample& nackfrag_count = dynamic_cast(sample); + // Reject samples with old timestamps + if (nackfrag_count.src_ts <= reader->second->data.last_reported_nackfrag_count.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1383,6 +1463,12 @@ void Database::insert_nts( { const GapCountSample& gap_count = dynamic_cast(sample); + // Reject samples with old timestamps + if (gap_count.src_ts <= writer->second->data.last_reported_gap_count.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1424,6 +1510,12 @@ void Database::insert_nts( { const DataCountSample& data_count = dynamic_cast(sample); + // Reject samples with old timestamps + if (data_count.src_ts <= writer->second->data.last_reported_data_count.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1465,6 +1557,11 @@ void Database::insert_nts( { const PdpCountSample& pdp_packets = dynamic_cast(sample); + // Reject samples with old timestamps + if (pdp_packets.src_ts <= participant->second->data.last_reported_pdp_packets.src_ts) + { + break; + } // Check if the insertion is from the load if (loading) { @@ -1507,6 +1604,12 @@ void Database::insert_nts( { const EdpCountSample& edp_packets = dynamic_cast(sample); + // Reject samples with old timestamps + if (edp_packets.src_ts <= participant->second->data.last_reported_edp_packets.src_ts) + { + break; + } + // Check if the insertion is from the load if (loading) { @@ -1547,6 +1650,13 @@ void Database::insert_nts( if (participant != domain_participants->second.end()) { const DiscoveryTimeSample& discovery_time = dynamic_cast(sample); + + // Reject samples with old timestamps + if (discovery_time.src_ts <= participant->second->data.discovered_entity[discovery_time.remote_entity].back().src_ts) + { + break; + } + participant->second->data.discovered_entity[discovery_time.remote_entity].push_back(discovery_time); break; } @@ -1565,6 +1675,13 @@ void Database::insert_nts( if (writer != domain_writers->second.end()) { const SampleDatasCountSample& sample_datas = dynamic_cast(sample); + + // Reject samples with old timestamps + if (sample_datas.src_ts <= writer->second->data.sample_datas[sample_datas.sequence_number].back().src_ts) + { + break; + } + // Only save the last received sample for each sequence number writer->second->data.sample_datas[sample_datas.sequence_number].clear(); writer->second->data.sample_datas[sample_datas.sequence_number].push_back(sample_datas); @@ -1610,6 +1727,13 @@ bool Database::insert_nts( std::dynamic_pointer_cast(entity); std::shared_ptr participant = std::const_pointer_cast( const_participant); + + // Reject samples with old timestamps + if (proxy.src_ts <= participant->monitor_service_data.proxy.back().src_ts) + { + break; + } + participant->monitor_service_data.proxy.push_back(proxy); break; } @@ -1618,6 +1742,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (proxy.src_ts <= datareader->monitor_service_data.proxy.back().src_ts) + { + break; + } + datareader->monitor_service_data.proxy.push_back(proxy); break; } @@ -1626,6 +1757,13 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) + { + break; + } + datawriter->monitor_service_data.proxy.push_back(proxy); break; } @@ -1648,6 +1786,13 @@ bool Database::insert_nts( std::dynamic_pointer_cast(entity); std::shared_ptr participant = std::const_pointer_cast( const_participant); + + // Reject samples with old timestamps + if (connection_list.src_ts <= participant->monitor_service_data.connection_list.back().src_ts) + { + break; + } + participant->monitor_service_data.connection_list.push_back(connection_list); break; } @@ -1656,6 +1801,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (connection_list.src_ts <= datareader->monitor_service_data.connection_list.back().src_ts) + { + break; + } + datareader->monitor_service_data.connection_list.push_back(connection_list); break; } @@ -1664,6 +1816,13 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (connection_list.src_ts <= datawriter->monitor_service_data.connection_list.back().src_ts) + { + break; + } + datawriter->monitor_service_data.connection_list.push_back(connection_list); break; } @@ -1685,6 +1844,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (incompatible_qos.src_ts <= datareader->monitor_service_data.incompatible_qos.back().src_ts) + { + break; + } + datareader->monitor_service_data.incompatible_qos.push_back(incompatible_qos); entity_updated = update_entity_status_nts(datareader); break; @@ -1694,6 +1860,13 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (incompatible_qos.src_ts <= datawriter->monitor_service_data.incompatible_qos.back().src_ts) + { + break; + } + datawriter->monitor_service_data.incompatible_qos.push_back(incompatible_qos); entity_updated = update_entity_status_nts(datawriter); break; @@ -1716,6 +1889,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (inconsistent_topic.src_ts <= datareader->monitor_service_data.inconsistent_topic.back().src_ts) + { + break; + } + datareader->monitor_service_data.inconsistent_topic.push_back(inconsistent_topic); entity_updated = update_entity_status_nts(datareader); break; @@ -1725,7 +1905,15 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (inconsistent_topic.src_ts <= datawriter->monitor_service_data.inconsistent_topic.back().src_ts) + { + break; + } + datawriter->monitor_service_data.inconsistent_topic.push_back(inconsistent_topic); + entity_updated = update_entity_status_nts(datawriter); break; } @@ -1745,6 +1933,13 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast(entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (liveliness_lost.src_ts <= datawriter->monitor_service_data.liveliness_lost.back().src_ts) + { + break; + } + datawriter->monitor_service_data.liveliness_lost.push_back(liveliness_lost); entity_updated = update_entity_status_nts(datawriter); break; @@ -1764,6 +1959,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast(entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (liveliness_changed.src_ts <= datareader->monitor_service_data.liveliness_changed.back().src_ts) + { + break; + } + datareader->monitor_service_data.liveliness_changed.push_back(liveliness_changed); break; } @@ -1784,6 +1986,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (deadline_missed.src_ts <= datareader->monitor_service_data.deadline_missed.back().src_ts) + { + break; + } + datareader->monitor_service_data.deadline_missed.push_back(deadline_missed); entity_updated = update_entity_status_nts(datareader); break; @@ -1793,6 +2002,13 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (deadline_missed.src_ts <= datawriter->monitor_service_data.deadline_missed.back().src_ts) + { + break; + } + datawriter->monitor_service_data.deadline_missed.push_back(deadline_missed); entity_updated = update_entity_status_nts(datawriter); break; @@ -1813,6 +2029,13 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast(entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (sample_lost.src_ts <= datareader->monitor_service_data.sample_lost.back().src_ts) + { + break; + } + datareader->monitor_service_data.sample_lost.push_back(sample_lost); entity_updated = update_entity_status_nts(datareader); break; From 4dab0812a41cfa84d832d4fd0c425155a2224388 Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Wed, 28 Feb 2024 15:01:14 +0100 Subject: [PATCH 2/4] Refs #20256: Add test + fixes Signed-off-by: Jesus Perez --- src/cpp/database/database.cpp | 107 +++++++++++++------ test/unittest/Database/CMakeLists.txt | 1 + test/unittest/Database/DatabaseDumpTests.cpp | 20 +++- test/unittest/Database/DatabaseTests.cpp | 53 +++++++-- 4 files changed, 138 insertions(+), 43 deletions(-) diff --git a/src/cpp/database/database.cpp b/src/cpp/database/database.cpp index d2f47886..840e3e03 100644 --- a/src/cpp/database/database.cpp +++ b/src/cpp/database/database.cpp @@ -945,7 +945,9 @@ void Database::insert_nts( const HistoryLatencySample& fastdds_latency = dynamic_cast(sample); // Reject samples with old timestamps - if (fastdds_latency.src_ts <= writer->second->data.history2history_latency[fastdds_latency.reader].back().src_ts) + if (writer->second->data.history2history_latency.find(fastdds_latency.reader) != + writer->second->data.history2history_latency.end() && + fastdds_latency.src_ts <= writer->second->data.history2history_latency[fastdds_latency.reader].back().src_ts) { break; } @@ -970,7 +972,9 @@ void Database::insert_nts( const NetworkLatencySample& network_latency = dynamic_cast(sample); // Reject samples with old timestamps - if (network_latency.src_ts <= participant->second->data.network_latency_per_locator[network_latency.remote_locator].back().src_ts) + if (participant->second->data.network_latency_per_locator.find(network_latency.remote_locator) != + participant->second->data.network_latency_per_locator.end() && + network_latency.src_ts <= participant->second->data.network_latency_per_locator[network_latency.remote_locator].back().src_ts) { break; } @@ -997,7 +1001,8 @@ void Database::insert_nts( dynamic_cast(sample); // Reject samples with old timestamps - if (publication_throughput.src_ts <= writer->second->data.publication_throughput.back().src_ts) + if (!writer->second->data.publication_throughput.empty() && + publication_throughput.src_ts <= writer->second->data.publication_throughput.back().src_ts) { break; } @@ -1023,7 +1028,8 @@ void Database::insert_nts( dynamic_cast(sample); // Reject samples with old timestamps - if (subscription_throughput.src_ts <= reader->second->data.subscription_throughput.back().src_ts) + if (!reader->second->data.subscription_throughput.empty() && + subscription_throughput.src_ts <= reader->second->data.subscription_throughput.back().src_ts) { break; } @@ -1048,7 +1054,9 @@ void Database::insert_nts( const RtpsPacketsSentSample& rtps_packets_sent = dynamic_cast(sample); // Reject samples with old timestamps - if (rtps_packets_sent.src_ts <= participant->second->data.last_reported_rtps_packets_sent_count[ + if (participant->second->data.rtps_packets_sent.find(rtps_packets_sent.remote_locator) != + participant->second->data.rtps_packets_sent.end() && + rtps_packets_sent.src_ts <= participant->second->data.last_reported_rtps_packets_sent_count[ rtps_packets_sent.remote_locator].src_ts) { break; @@ -1106,7 +1114,9 @@ void Database::insert_nts( const RtpsBytesSentSample& rtps_bytes_sent = dynamic_cast(sample); // Reject samples with old timestamps - if (rtps_bytes_sent.src_ts <= participant->second->data.last_reported_rtps_bytes_sent_count[ + if (participant->second->data.rtps_bytes_sent.find(rtps_bytes_sent.remote_locator) != + participant->second->data.rtps_bytes_sent.end() && + rtps_bytes_sent.src_ts <= participant->second->data.last_reported_rtps_bytes_sent_count[ rtps_bytes_sent.remote_locator].src_ts) { break; @@ -1162,7 +1172,9 @@ void Database::insert_nts( const RtpsPacketsLostSample& rtps_packets_lost = dynamic_cast(sample); // Reject samples with old timestamps - if (rtps_packets_lost.src_ts <= participant->second->data.last_reported_rtps_packets_lost_count[ + if (participant->second->data.rtps_packets_lost.find(rtps_packets_lost.remote_locator) != + participant->second->data.rtps_packets_lost.end() && + rtps_packets_lost.src_ts <= participant->second->data.last_reported_rtps_packets_lost_count[ rtps_packets_lost.remote_locator].src_ts) { break; @@ -1220,7 +1232,9 @@ void Database::insert_nts( const RtpsBytesLostSample& rtps_bytes_lost = dynamic_cast(sample); // Reject samples with old timestamps - if (rtps_bytes_lost.src_ts <= participant->second->data.last_reported_rtps_bytes_lost_count[ + if (participant->second->data.rtps_bytes_lost.find(rtps_bytes_lost.remote_locator) != + participant->second->data.rtps_bytes_lost.end() && + rtps_bytes_lost.src_ts <= participant->second->data.last_reported_rtps_bytes_lost_count[ rtps_bytes_lost.remote_locator].src_ts) { break; @@ -1276,7 +1290,8 @@ void Database::insert_nts( const ResentDataSample& resent_datas = dynamic_cast(sample); // Reject samples with old timestamps - if (resent_datas.src_ts <= writer->second->data.last_reported_resent_datas.src_ts) + if (!writer->second->data.resent_datas.empty() && + resent_datas.src_ts <= writer->second->data.last_reported_resent_datas.src_ts) { break; } @@ -1323,7 +1338,8 @@ void Database::insert_nts( const HeartbeatCountSample& heartbeat_count = dynamic_cast(sample); // Reject samples with old timestamps - if (heartbeat_count.src_ts <= writer->second->data.last_reported_heartbeat_count.src_ts) + if (!writer->second->data.heartbeat_count.empty() && + heartbeat_count.src_ts <= writer->second->data.last_reported_heartbeat_count.src_ts) { break; } @@ -1370,7 +1386,8 @@ void Database::insert_nts( const AcknackCountSample& acknack_count = dynamic_cast(sample); // Reject samples with old timestamps - if (acknack_count.src_ts <= reader->second->data.last_reported_acknack_count.src_ts) + if (!reader->second->data.acknack_count.empty() && + acknack_count.src_ts <= reader->second->data.last_reported_acknack_count.src_ts) { break; } @@ -1417,7 +1434,8 @@ void Database::insert_nts( const NackfragCountSample& nackfrag_count = dynamic_cast(sample); // Reject samples with old timestamps - if (nackfrag_count.src_ts <= reader->second->data.last_reported_nackfrag_count.src_ts) + if (!reader->second->data.nackfrag_count.empty() && + nackfrag_count.src_ts <= reader->second->data.last_reported_nackfrag_count.src_ts) { break; } @@ -1464,7 +1482,8 @@ void Database::insert_nts( const GapCountSample& gap_count = dynamic_cast(sample); // Reject samples with old timestamps - if (gap_count.src_ts <= writer->second->data.last_reported_gap_count.src_ts) + if (!writer->second->data.gap_count.empty() && + gap_count.src_ts <= writer->second->data.last_reported_gap_count.src_ts) { break; } @@ -1511,7 +1530,8 @@ void Database::insert_nts( const DataCountSample& data_count = dynamic_cast(sample); // Reject samples with old timestamps - if (data_count.src_ts <= writer->second->data.last_reported_data_count.src_ts) + if (!writer->second->data.data_count.empty() && + data_count.src_ts <= writer->second->data.last_reported_data_count.src_ts) { break; } @@ -1558,7 +1578,8 @@ void Database::insert_nts( const PdpCountSample& pdp_packets = dynamic_cast(sample); // Reject samples with old timestamps - if (pdp_packets.src_ts <= participant->second->data.last_reported_pdp_packets.src_ts) + if (!participant->second->data.pdp_packets.empty() && + pdp_packets.src_ts <= participant->second->data.last_reported_pdp_packets.src_ts) { break; } @@ -1605,7 +1626,8 @@ void Database::insert_nts( const EdpCountSample& edp_packets = dynamic_cast(sample); // Reject samples with old timestamps - if (edp_packets.src_ts <= participant->second->data.last_reported_edp_packets.src_ts) + if (!participant->second->data.edp_packets.empty() && + edp_packets.src_ts <= participant->second->data.last_reported_edp_packets.src_ts) { break; } @@ -1652,7 +1674,9 @@ void Database::insert_nts( const DiscoveryTimeSample& discovery_time = dynamic_cast(sample); // Reject samples with old timestamps - if (discovery_time.src_ts <= participant->second->data.discovered_entity[discovery_time.remote_entity].back().src_ts) + if (participant->second->data.discovered_entity.find(discovery_time.remote_entity) != + participant->second->data.discovered_entity.end() && + discovery_time.src_ts <= participant->second->data.discovered_entity[discovery_time.remote_entity].back().src_ts) { break; } @@ -1677,7 +1701,9 @@ void Database::insert_nts( const SampleDatasCountSample& sample_datas = dynamic_cast(sample); // Reject samples with old timestamps - if (sample_datas.src_ts <= writer->second->data.sample_datas[sample_datas.sequence_number].back().src_ts) + if (writer->second->data.sample_datas.find(sample_datas.sequence_number) != + writer->second->data.sample_datas.end() && + sample_datas.src_ts <= writer->second->data.sample_datas[sample_datas.sequence_number].back().src_ts) { break; } @@ -1729,7 +1755,8 @@ bool Database::insert_nts( const_participant); // Reject samples with old timestamps - if (proxy.src_ts <= participant->monitor_service_data.proxy.back().src_ts) + if (!participant->monitor_service_data.proxy.empty() && + proxy.src_ts <= participant->monitor_service_data.proxy.back().src_ts) { break; } @@ -1744,7 +1771,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (proxy.src_ts <= datareader->monitor_service_data.proxy.back().src_ts) + if (!datareader->monitor_service_data.proxy.empty() && + proxy.src_ts <= datareader->monitor_service_data.proxy.back().src_ts) { break; } @@ -1759,7 +1787,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) + if (datawriter->monitor_service_data.proxy.empty() && + proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) { break; } @@ -1788,7 +1817,8 @@ bool Database::insert_nts( const_participant); // Reject samples with old timestamps - if (connection_list.src_ts <= participant->monitor_service_data.connection_list.back().src_ts) + if (!participant->monitor_service_data.connection_list.empty() && + connection_list.src_ts <= participant->monitor_service_data.connection_list.back().src_ts) { break; } @@ -1803,7 +1833,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (connection_list.src_ts <= datareader->monitor_service_data.connection_list.back().src_ts) + if (!datareader->monitor_service_data.connection_list.empty() && + connection_list.src_ts <= datareader->monitor_service_data.connection_list.back().src_ts) { break; } @@ -1818,7 +1849,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (connection_list.src_ts <= datawriter->monitor_service_data.connection_list.back().src_ts) + if (!datawriter->monitor_service_data.connection_list.empty() && + connection_list.src_ts <= datawriter->monitor_service_data.connection_list.back().src_ts) { break; } @@ -1846,7 +1878,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (incompatible_qos.src_ts <= datareader->monitor_service_data.incompatible_qos.back().src_ts) + if (!datareader->monitor_service_data.incompatible_qos.empty() && + incompatible_qos.src_ts <= datareader->monitor_service_data.incompatible_qos.back().src_ts) { break; } @@ -1862,7 +1895,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (incompatible_qos.src_ts <= datawriter->monitor_service_data.incompatible_qos.back().src_ts) + if (!datawriter->monitor_service_data.incompatible_qos.empty() && + incompatible_qos.src_ts <= datawriter->monitor_service_data.incompatible_qos.back().src_ts) { break; } @@ -1891,7 +1925,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (inconsistent_topic.src_ts <= datareader->monitor_service_data.inconsistent_topic.back().src_ts) + if (!datareader->monitor_service_data.inconsistent_topic.empty() && + inconsistent_topic.src_ts <= datareader->monitor_service_data.inconsistent_topic.back().src_ts) { break; } @@ -1907,7 +1942,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (inconsistent_topic.src_ts <= datawriter->monitor_service_data.inconsistent_topic.back().src_ts) + if (!datawriter->monitor_service_data.inconsistent_topic.empty() && + inconsistent_topic.src_ts <= datawriter->monitor_service_data.inconsistent_topic.back().src_ts) { break; } @@ -1935,7 +1971,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (liveliness_lost.src_ts <= datawriter->monitor_service_data.liveliness_lost.back().src_ts) + if (!datawriter->monitor_service_data.liveliness_lost.empty() && + liveliness_lost.src_ts <= datawriter->monitor_service_data.liveliness_lost.back().src_ts) { break; } @@ -1961,7 +1998,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (liveliness_changed.src_ts <= datareader->monitor_service_data.liveliness_changed.back().src_ts) + if (!datareader->monitor_service_data.liveliness_changed.empty() && + liveliness_changed.src_ts <= datareader->monitor_service_data.liveliness_changed.back().src_ts) { break; } @@ -1988,7 +2026,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (deadline_missed.src_ts <= datareader->monitor_service_data.deadline_missed.back().src_ts) + if (!datareader->monitor_service_data.deadline_missed.empty() && + deadline_missed.src_ts <= datareader->monitor_service_data.deadline_missed.back().src_ts) { break; } @@ -2004,7 +2043,8 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (deadline_missed.src_ts <= datawriter->monitor_service_data.deadline_missed.back().src_ts) + if (!datawriter->monitor_service_data.deadline_missed.empty() && + deadline_missed.src_ts <= datawriter->monitor_service_data.deadline_missed.back().src_ts) { break; } @@ -2031,7 +2071,8 @@ bool Database::insert_nts( std::shared_ptr datareader = std::const_pointer_cast(const_datareader); // Reject samples with old timestamps - if (sample_lost.src_ts <= datareader->monitor_service_data.sample_lost.back().src_ts) + if (!datareader->monitor_service_data.sample_lost.empty() && + sample_lost.src_ts <= datareader->monitor_service_data.sample_lost.back().src_ts) { break; } diff --git a/test/unittest/Database/CMakeLists.txt b/test/unittest/Database/CMakeLists.txt index 971f5fd4..911930ae 100644 --- a/test/unittest/Database/CMakeLists.txt +++ b/test/unittest/Database/CMakeLists.txt @@ -492,6 +492,7 @@ set(DATABASE_DUMP_TEST_LIST dump_empty_entities_database dump_simple_database dump_complex_database + dump_complex_database_reject_old_data dump_unlinked_database id_to_string time_to_string diff --git a/test/unittest/Database/DatabaseDumpTests.cpp b/test/unittest/Database/DatabaseDumpTests.cpp index 53cf4209..4f80b6f9 100644 --- a/test/unittest/Database/DatabaseDumpTests.cpp +++ b/test/unittest/Database/DatabaseDumpTests.cpp @@ -291,7 +291,8 @@ void initialize_database( Database& db, int n_entity, int n_data, - bool link_process_participant = true) + bool link_process_participant = true, + bool insert_old_data = false) { for (int i = 0; i < n_entity; ++i) { @@ -302,6 +303,12 @@ void initialize_database( initialize_datawriter_data(db, i, j); initialize_datareader_data(db, i, j); } + if (n_data > 0 && insert_old_data) + { + initialize_participant_data(db, i, 0); + initialize_datawriter_data(db, i, 0); + initialize_datareader_data(db, i, 0); + } } } @@ -364,6 +371,17 @@ TEST(database, dump_complex_database) ASSERT_EQ(db.dump_database(), dump); } +// Test the dump of a database with three entities of each kind and three datas of each kind +// that after inserting new data, receives old samples +TEST(database, dump_complex_database_reject_old_data) +{ + Database db; + initialize_database(db, 3, 3, true, true); + DatabaseDump dump; + load_file(COMPLEX_DUMP_FILE, dump); + ASSERT_EQ(db.dump_database(), dump); +} + void initialize_empty_entities_unlinked( Database& db, int index) diff --git a/test/unittest/Database/DatabaseTests.cpp b/test/unittest/Database/DatabaseTests.cpp index 3d06ddcd..524161de 100644 --- a/test/unittest/Database/DatabaseTests.cpp +++ b/test/unittest/Database/DatabaseTests.cpp @@ -1818,7 +1818,7 @@ TEST_F(database_tests, insert_sample_history_latency) HistoryLatencySample sample_2; sample_2.reader = reader_id; sample_2.data = 13; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.history2history_latency[reader_id].size(), 2u); @@ -1839,11 +1839,13 @@ TEST_F(database_tests, insert_sample_network_latency) NetworkLatencySample sample; sample.remote_locator = reader_locator->id; sample.data = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); NetworkLatencySample sample_2; sample_2.remote_locator = reader_locator->id; sample_2.data = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->data.network_latency_per_locator[reader_locator->id].size(), 2u); @@ -1865,10 +1867,12 @@ TEST_F(database_tests, insert_sample_publication_throughput) { PublicationThroughputSample sample; sample.data = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); PublicationThroughputSample sample_2; sample_2.data = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.publication_throughput.size(), 2u); @@ -1887,10 +1891,12 @@ TEST_F(database_tests, insert_sample_subscription_throughput) { SubscriptionThroughputSample sample; sample.data = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample)); SubscriptionThroughputSample sample_2; sample_2.data = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); ASSERT_EQ(reader->data.subscription_throughput.size(), 2u); @@ -1910,11 +1916,13 @@ TEST_F(database_tests, insert_sample_rtps_packets_sent) RtpsPacketsSentSample sample; sample.remote_locator = writer_locator->id; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); RtpsPacketsSentSample sample_2; sample_2.remote_locator = writer_locator->id; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->data.rtps_packets_sent.size(), 1u); @@ -1956,12 +1964,14 @@ TEST_F(database_tests, insert_sample_rtps_bytes_sent) RtpsBytesSentSample sample; sample.remote_locator = writer_locator->id; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); sample.magnitude_order = 2; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); RtpsBytesSentSample sample_2; sample_2.remote_locator = writer_locator->id; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.magnitude_order = 3; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); @@ -2008,11 +2018,13 @@ TEST_F(database_tests, insert_sample_rtps_packets_lost) RtpsPacketsLostSample sample; sample.remote_locator = writer_locator->id; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); RtpsPacketsLostSample sample_2; sample_2.remote_locator = writer_locator->id; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->data.rtps_packets_lost.size(), 1u); @@ -2053,12 +2065,14 @@ TEST_F(database_tests, insert_sample_rtps_bytes_lost) RtpsBytesLostSample sample; sample.remote_locator = writer_locator->id; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); sample.magnitude_order = 2; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); RtpsBytesLostSample sample_2; sample_2.remote_locator = writer_locator->id; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.magnitude_order = 3; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); @@ -2104,10 +2118,12 @@ TEST_F(database_tests, insert_sample_resent_data) { ResentDataSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); ResentDataSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.resent_datas.size(), 2u); @@ -2128,10 +2144,12 @@ TEST_F(database_tests, insert_sample_heartbeat_count) { HeartbeatCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); HeartbeatCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.heartbeat_count.size(), 2u); @@ -2152,10 +2170,12 @@ TEST_F(database_tests, insert_sample_acknack_count) { AcknackCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample)); AcknackCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); ASSERT_EQ(reader->data.acknack_count.size(), 2u); @@ -2176,10 +2196,12 @@ TEST_F(database_tests, insert_sample_nackfrag_count) { NackfragCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample)); NackfragCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); ASSERT_EQ(reader->data.nackfrag_count.size(), 2u); @@ -2200,10 +2222,12 @@ TEST_F(database_tests, insert_sample_gap_count) { GapCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); GapCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.gap_count.size(), 2u); @@ -2224,10 +2248,12 @@ TEST_F(database_tests, insert_sample_data_count) { DataCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); DataCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.data_count.size(), 2u); @@ -2248,10 +2274,12 @@ TEST_F(database_tests, insert_sample_pdp_packets) { PdpCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); PdpCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->data.pdp_packets.size(), 2u); @@ -2272,10 +2300,12 @@ TEST_F(database_tests, insert_sample_edp_packets) { EdpCountSample sample; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); EdpCountSample sample_2; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->data.edp_packets.size(), 2u); @@ -2297,12 +2327,14 @@ TEST_F(database_tests, insert_sample_discovery_time) DiscoveryTimeSample sample; sample.remote_entity = writer_id; sample.time = std::chrono::system_clock::now(); + sample.src_ts = std::chrono::system_clock::now(); sample.discovered = true; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample)); DiscoveryTimeSample sample_2; sample_2.remote_entity = writer_id; sample_2.time = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.discovered = true; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); @@ -2326,11 +2358,13 @@ TEST_F(database_tests, insert_sample_sample_datas) SampleDatasCountSample sample; sample.sequence_number = 2; sample.count = 12; + sample.src_ts = std::chrono::system_clock::now(); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample)); SampleDatasCountSample sample_2; sample_2.sequence_number = 3; sample_2.count = 13; + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(writer->data.sample_datas.size(), 2u); @@ -2341,6 +2375,7 @@ TEST_F(database_tests, insert_sample_sample_datas) SampleDatasCountSample sample_3; sample_3.sequence_number = 2; sample_3.count = 16; + sample_3.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(2); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_3)); ASSERT_EQ(writer->data.sample_datas.size(), 2u); @@ -2436,7 +2471,7 @@ TEST_F(database_tests, insert_monitor_service_sample_proxy) ProxySample sample_2; sample_2.kind = StatusKind::PROXY; sample_2.status = StatusLevel::OK_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.entity_proxy = {6, 7, 8, 9, 10}; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->status, StatusLevel::OK_STATUS); @@ -2518,7 +2553,7 @@ TEST_F(database_tests, insert_monitor_service_sample_connection_list) connection_sample_2.used_locators({locator_2}); sample_2.kind = StatusKind::CONNECTION_LIST; sample_2.status = StatusLevel::OK_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.connection_list = {connection_sample_2, connection_sample_2}; ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); ASSERT_EQ(participant->status, StatusLevel::OK_STATUS); @@ -2587,7 +2622,7 @@ TEST_F(database_tests, insert_monitor_service_sample_incompatible_qos) IncompatibleQosSample sample_2; sample_2.kind = StatusKind::INCOMPATIBLE_QOS; sample_2.status = StatusLevel::ERROR_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.incompatible_qos_status.total_count(2); sample_2.incompatible_qos_status.last_policy_id(3); eprosima::fastdds::statistics::QosPolicyCountSeq_s qos_policy_count_seq_2; @@ -2644,7 +2679,7 @@ TEST_F(database_tests, insert_monitor_service_sample_inconsistent_topic) InconsistentTopicSample sample_2; sample_2.kind = StatusKind::INCONSISTENT_TOPIC; sample_2.status = StatusLevel::ERROR_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.inconsistent_topic_status.total_count(2); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(participant->status, StatusLevel::OK_STATUS); @@ -2686,7 +2721,7 @@ TEST_F(database_tests, insert_monitor_service_sample_liveliness_lost) LivelinessLostSample sample_2; sample_2.kind = StatusKind::LIVELINESS_LOST; sample_2.status = StatusLevel::WARNING_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.liveliness_lost_status.total_count(5); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); ASSERT_EQ(participant->status, StatusLevel::OK_STATUS); @@ -2729,7 +2764,7 @@ TEST_F(database_tests, insert_monitor_service_sample_liveliness_changed) LivelinessChangedSample sample_2; sample_2.kind = StatusKind::LIVELINESS_CHANGED; sample_2.status = StatusLevel::OK_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.liveliness_changed_status.alive_count(2); sample_2.liveliness_changed_status.not_alive_count(4); sample_2.liveliness_changed_status.last_publication_handle({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}); @@ -2776,7 +2811,7 @@ TEST_F(database_tests, insert_monitor_service_sample_deadline_missed) DeadlineMissedSample sample_2; sample_2.kind = StatusKind::DEADLINE_MISSED; sample_2.status = StatusLevel::WARNING_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.deadline_missed_status.total_count(2); sample_2.deadline_missed_status.last_instance_handle({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}); ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); @@ -2820,7 +2855,7 @@ TEST_F(database_tests, insert_monitor_service_sample_sample_lost) SampleLostSample sample_2; sample_2.kind = StatusKind::SAMPLE_LOST; sample_2.status = StatusLevel::WARNING_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); sample_2.sample_lost_status.total_count(2); ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); ASSERT_EQ(participant->status, StatusLevel::OK_STATUS); From 60d5663421887d64b4640ca6d666a4d95146c5ca Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Mon, 4 Mar 2024 10:57:19 +0100 Subject: [PATCH 3/4] Refs #20256: Fix for last reported samples load Signed-off-by: Jesus Perez --- src/cpp/database/database.cpp | 159 ++++++++++-------- test/unittest/Database/DatabaseTests.cpp | 198 +++++++++++++++++++++++ 2 files changed, 291 insertions(+), 66 deletions(-) diff --git a/src/cpp/database/database.cpp b/src/cpp/database/database.cpp index 840e3e03..e48f574f 100644 --- a/src/cpp/database/database.cpp +++ b/src/cpp/database/database.cpp @@ -943,11 +943,13 @@ void Database::insert_nts( if (writer != domain_writers->second.end()) { const HistoryLatencySample& fastdds_latency = dynamic_cast(sample); - + // Reject samples with old timestamps - if (writer->second->data.history2history_latency.find(fastdds_latency.reader) != - writer->second->data.history2history_latency.end() && - fastdds_latency.src_ts <= writer->second->data.history2history_latency[fastdds_latency.reader].back().src_ts) + if (writer->second->data.history2history_latency.find(fastdds_latency.reader) != + writer->second->data.history2history_latency.end() && + fastdds_latency.src_ts <= + writer->second->data.history2history_latency[fastdds_latency.reader].back(). + src_ts) { break; } @@ -973,8 +975,10 @@ void Database::insert_nts( // Reject samples with old timestamps if (participant->second->data.network_latency_per_locator.find(network_latency.remote_locator) != - participant->second->data.network_latency_per_locator.end() && - network_latency.src_ts <= participant->second->data.network_latency_per_locator[network_latency.remote_locator].back().src_ts) + participant->second->data.network_latency_per_locator.end() && + network_latency.src_ts <= + participant->second->data.network_latency_per_locator[network_latency. + remote_locator].back().src_ts) { break; } @@ -999,10 +1003,10 @@ void Database::insert_nts( { const PublicationThroughputSample& publication_throughput = dynamic_cast(sample); - + // Reject samples with old timestamps if (!writer->second->data.publication_throughput.empty() && - publication_throughput.src_ts <= writer->second->data.publication_throughput.back().src_ts) + publication_throughput.src_ts <= writer->second->data.publication_throughput.back().src_ts) { break; } @@ -1029,7 +1033,8 @@ void Database::insert_nts( // Reject samples with old timestamps if (!reader->second->data.subscription_throughput.empty() && - subscription_throughput.src_ts <= reader->second->data.subscription_throughput.back().src_ts) + subscription_throughput.src_ts <= + reader->second->data.subscription_throughput.back().src_ts) { break; } @@ -1053,11 +1058,11 @@ void Database::insert_nts( { const RtpsPacketsSentSample& rtps_packets_sent = dynamic_cast(sample); - // Reject samples with old timestamps - if (participant->second->data.rtps_packets_sent.find(rtps_packets_sent.remote_locator) != - participant->second->data.rtps_packets_sent.end() && - rtps_packets_sent.src_ts <= participant->second->data.last_reported_rtps_packets_sent_count[ - rtps_packets_sent.remote_locator].src_ts) + // Reject samples with old timestamps (unless we are loading last reported) + auto it = participant->second->data.rtps_packets_sent.find(rtps_packets_sent.remote_locator); + if (it != participant->second->data.rtps_packets_sent.end() && + rtps_packets_sent.src_ts <= it->second.back().src_ts && + !(loading && last_reported && rtps_packets_sent.src_ts == it->second.back().src_ts)) { break; } @@ -1113,11 +1118,11 @@ void Database::insert_nts( { const RtpsBytesSentSample& rtps_bytes_sent = dynamic_cast(sample); - // Reject samples with old timestamps - if (participant->second->data.rtps_bytes_sent.find(rtps_bytes_sent.remote_locator) != - participant->second->data.rtps_bytes_sent.end() && - rtps_bytes_sent.src_ts <= participant->second->data.last_reported_rtps_bytes_sent_count[ - rtps_bytes_sent.remote_locator].src_ts) + // Reject samples with old timestamps (unless we are loading last reported) + auto it = participant->second->data.rtps_bytes_sent.find(rtps_bytes_sent.remote_locator); + if (it != participant->second->data.rtps_bytes_sent.end() && + rtps_bytes_sent.src_ts <= it->second.back().src_ts && + !(loading && last_reported && rtps_bytes_sent.src_ts == it->second.back().src_ts)) { break; } @@ -1171,11 +1176,11 @@ void Database::insert_nts( { const RtpsPacketsLostSample& rtps_packets_lost = dynamic_cast(sample); - // Reject samples with old timestamps - if (participant->second->data.rtps_packets_lost.find(rtps_packets_lost.remote_locator) != - participant->second->data.rtps_packets_lost.end() && - rtps_packets_lost.src_ts <= participant->second->data.last_reported_rtps_packets_lost_count[ - rtps_packets_lost.remote_locator].src_ts) + // Reject samples with old timestamps (unless we are loading last reported) + auto it = participant->second->data.rtps_packets_lost.find(rtps_packets_lost.remote_locator); + if (it != participant->second->data.rtps_packets_lost.end() && + rtps_packets_lost.src_ts <= it->second.back().src_ts && + !(loading && last_reported && rtps_packets_lost.src_ts == it->second.back().src_ts)) { break; } @@ -1231,11 +1236,11 @@ void Database::insert_nts( { const RtpsBytesLostSample& rtps_bytes_lost = dynamic_cast(sample); - // Reject samples with old timestamps - if (participant->second->data.rtps_bytes_lost.find(rtps_bytes_lost.remote_locator) != - participant->second->data.rtps_bytes_lost.end() && - rtps_bytes_lost.src_ts <= participant->second->data.last_reported_rtps_bytes_lost_count[ - rtps_bytes_lost.remote_locator].src_ts) + // Reject samples with old timestamps (unless we are loading last reported) + auto it = participant->second->data.rtps_bytes_lost.find(rtps_bytes_lost.remote_locator); + if (it != participant->second->data.rtps_bytes_lost.end() && + rtps_bytes_lost.src_ts <= it->second.back().src_ts && + !(loading && last_reported && rtps_bytes_lost.src_ts == it->second.back().src_ts)) { break; } @@ -1289,9 +1294,11 @@ void Database::insert_nts( { const ResentDataSample& resent_datas = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!writer->second->data.resent_datas.empty() && - resent_datas.src_ts <= writer->second->data.last_reported_resent_datas.src_ts) + resent_datas.src_ts <= writer->second->data.resent_datas.back().src_ts && + !(loading && last_reported && + resent_datas.src_ts == writer->second->data.resent_datas.back().src_ts)) { break; } @@ -1337,9 +1344,11 @@ void Database::insert_nts( { const HeartbeatCountSample& heartbeat_count = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!writer->second->data.heartbeat_count.empty() && - heartbeat_count.src_ts <= writer->second->data.last_reported_heartbeat_count.src_ts) + heartbeat_count.src_ts <= writer->second->data.heartbeat_count.back().src_ts && + !(loading && last_reported && + heartbeat_count.src_ts == writer->second->data.heartbeat_count.back().src_ts)) { break; } @@ -1385,9 +1394,11 @@ void Database::insert_nts( { const AcknackCountSample& acknack_count = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!reader->second->data.acknack_count.empty() && - acknack_count.src_ts <= reader->second->data.last_reported_acknack_count.src_ts) + acknack_count.src_ts <= reader->second->data.acknack_count.back().src_ts && + !(loading && last_reported && + acknack_count.src_ts == reader->second->data.acknack_count.back().src_ts)) { break; } @@ -1433,9 +1444,11 @@ void Database::insert_nts( { const NackfragCountSample& nackfrag_count = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!reader->second->data.nackfrag_count.empty() && - nackfrag_count.src_ts <= reader->second->data.last_reported_nackfrag_count.src_ts) + nackfrag_count.src_ts <= reader->second->data.nackfrag_count.back().src_ts && + !(loading && last_reported && + nackfrag_count.src_ts == reader->second->data.nackfrag_count.back().src_ts)) { break; } @@ -1481,9 +1494,11 @@ void Database::insert_nts( { const GapCountSample& gap_count = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!writer->second->data.gap_count.empty() && - gap_count.src_ts <= writer->second->data.last_reported_gap_count.src_ts) + gap_count.src_ts <= writer->second->data.gap_count.back().src_ts && + !(loading && last_reported && + gap_count.src_ts == writer->second->data.gap_count.back().src_ts)) { break; } @@ -1529,9 +1544,11 @@ void Database::insert_nts( { const DataCountSample& data_count = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!writer->second->data.data_count.empty() && - data_count.src_ts <= writer->second->data.last_reported_data_count.src_ts) + data_count.src_ts <= writer->second->data.data_count.back().src_ts && + !(loading && last_reported && + data_count.src_ts == writer->second->data.data_count.back().src_ts)) { break; } @@ -1577,12 +1594,15 @@ void Database::insert_nts( { const PdpCountSample& pdp_packets = dynamic_cast(sample); - // Reject samples with old timestamps + // Reject samples with old timestamps (unless we are loading last reported) if (!participant->second->data.pdp_packets.empty() && - pdp_packets.src_ts <= participant->second->data.last_reported_pdp_packets.src_ts) + pdp_packets.src_ts <= participant->second->data.pdp_packets.back().src_ts && + !(loading && last_reported && + pdp_packets.src_ts == participant->second->data.pdp_packets.back().src_ts)) { break; } + // Check if the insertion is from the load if (loading) { @@ -1627,7 +1647,9 @@ void Database::insert_nts( // Reject samples with old timestamps if (!participant->second->data.edp_packets.empty() && - edp_packets.src_ts <= participant->second->data.last_reported_edp_packets.src_ts) + edp_packets.src_ts <= participant->second->data.edp_packets.back().src_ts && + !(loading && last_reported && + edp_packets.src_ts == participant->second->data.edp_packets.back().src_ts)) { break; } @@ -1675,8 +1697,10 @@ void Database::insert_nts( // Reject samples with old timestamps if (participant->second->data.discovered_entity.find(discovery_time.remote_entity) != - participant->second->data.discovered_entity.end() && - discovery_time.src_ts <= participant->second->data.discovered_entity[discovery_time.remote_entity].back().src_ts) + participant->second->data.discovered_entity.end() && + discovery_time.src_ts <= + participant->second->data.discovered_entity[discovery_time.remote_entity].back() + .src_ts) { break; } @@ -1702,8 +1726,9 @@ void Database::insert_nts( // Reject samples with old timestamps if (writer->second->data.sample_datas.find(sample_datas.sequence_number) != - writer->second->data.sample_datas.end() && - sample_datas.src_ts <= writer->second->data.sample_datas[sample_datas.sequence_number].back().src_ts) + writer->second->data.sample_datas.end() && + sample_datas.src_ts <= + writer->second->data.sample_datas[sample_datas.sequence_number].back().src_ts) { break; } @@ -1753,10 +1778,10 @@ bool Database::insert_nts( std::dynamic_pointer_cast(entity); std::shared_ptr participant = std::const_pointer_cast( const_participant); - + // Reject samples with old timestamps if (!participant->monitor_service_data.proxy.empty() && - proxy.src_ts <= participant->monitor_service_data.proxy.back().src_ts) + proxy.src_ts <= participant->monitor_service_data.proxy.back().src_ts) { break; } @@ -1772,7 +1797,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.proxy.empty() && - proxy.src_ts <= datareader->monitor_service_data.proxy.back().src_ts) + proxy.src_ts <= datareader->monitor_service_data.proxy.back().src_ts) { break; } @@ -1788,7 +1813,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (datawriter->monitor_service_data.proxy.empty() && - proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) + proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) { break; } @@ -1818,11 +1843,11 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!participant->monitor_service_data.connection_list.empty() && - connection_list.src_ts <= participant->monitor_service_data.connection_list.back().src_ts) + connection_list.src_ts <= participant->monitor_service_data.connection_list.back().src_ts) { break; } - + participant->monitor_service_data.connection_list.push_back(connection_list); break; } @@ -1834,7 +1859,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.connection_list.empty() && - connection_list.src_ts <= datareader->monitor_service_data.connection_list.back().src_ts) + connection_list.src_ts <= datareader->monitor_service_data.connection_list.back().src_ts) { break; } @@ -1850,7 +1875,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datawriter->monitor_service_data.connection_list.empty() && - connection_list.src_ts <= datawriter->monitor_service_data.connection_list.back().src_ts) + connection_list.src_ts <= datawriter->monitor_service_data.connection_list.back().src_ts) { break; } @@ -1879,7 +1904,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.incompatible_qos.empty() && - incompatible_qos.src_ts <= datareader->monitor_service_data.incompatible_qos.back().src_ts) + incompatible_qos.src_ts <= datareader->monitor_service_data.incompatible_qos.back().src_ts) { break; } @@ -1896,7 +1921,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datawriter->monitor_service_data.incompatible_qos.empty() && - incompatible_qos.src_ts <= datawriter->monitor_service_data.incompatible_qos.back().src_ts) + incompatible_qos.src_ts <= datawriter->monitor_service_data.incompatible_qos.back().src_ts) { break; } @@ -1926,7 +1951,8 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.inconsistent_topic.empty() && - inconsistent_topic.src_ts <= datareader->monitor_service_data.inconsistent_topic.back().src_ts) + inconsistent_topic.src_ts <= + datareader->monitor_service_data.inconsistent_topic.back().src_ts) { break; } @@ -1943,7 +1969,8 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datawriter->monitor_service_data.inconsistent_topic.empty() && - inconsistent_topic.src_ts <= datawriter->monitor_service_data.inconsistent_topic.back().src_ts) + inconsistent_topic.src_ts <= + datawriter->monitor_service_data.inconsistent_topic.back().src_ts) { break; } @@ -1972,7 +1999,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datawriter->monitor_service_data.liveliness_lost.empty() && - liveliness_lost.src_ts <= datawriter->monitor_service_data.liveliness_lost.back().src_ts) + liveliness_lost.src_ts <= datawriter->monitor_service_data.liveliness_lost.back().src_ts) { break; } @@ -1999,7 +2026,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.liveliness_changed.empty() && - liveliness_changed.src_ts <= datareader->monitor_service_data.liveliness_changed.back().src_ts) + liveliness_changed.src_ts <= datareader->monitor_service_data.liveliness_changed.back().src_ts) { break; } @@ -2027,7 +2054,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.deadline_missed.empty() && - deadline_missed.src_ts <= datareader->monitor_service_data.deadline_missed.back().src_ts) + deadline_missed.src_ts <= datareader->monitor_service_data.deadline_missed.back().src_ts) { break; } @@ -2044,7 +2071,7 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datawriter->monitor_service_data.deadline_missed.empty() && - deadline_missed.src_ts <= datawriter->monitor_service_data.deadline_missed.back().src_ts) + deadline_missed.src_ts <= datawriter->monitor_service_data.deadline_missed.back().src_ts) { break; } @@ -2072,11 +2099,11 @@ bool Database::insert_nts( // Reject samples with old timestamps if (!datareader->monitor_service_data.sample_lost.empty() && - sample_lost.src_ts <= datareader->monitor_service_data.sample_lost.back().src_ts) + sample_lost.src_ts <= datareader->monitor_service_data.sample_lost.back().src_ts) { break; } - + datareader->monitor_service_data.sample_lost.push_back(sample_lost); entity_updated = update_entity_status_nts(datareader); break; diff --git a/test/unittest/Database/DatabaseTests.cpp b/test/unittest/Database/DatabaseTests.cpp index 524161de..fda40b0a 100644 --- a/test/unittest/Database/DatabaseTests.cpp +++ b/test/unittest/Database/DatabaseTests.cpp @@ -1824,6 +1824,13 @@ TEST_F(database_tests, insert_sample_history_latency) ASSERT_EQ(writer->data.history2history_latency[reader_id].size(), 2u); ASSERT_EQ(writer->data.history2history_latency[reader_id][0], static_cast(sample)); ASSERT_EQ(writer->data.history2history_latency[reader_id][1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.history2history_latency[reader_id].size(), 2u); + ASSERT_EQ(writer->data.history2history_latency[reader_id][0], static_cast(sample)); + ASSERT_EQ(writer->data.history2history_latency[reader_id][1], static_cast(sample_2)); + } TEST_F(database_tests, insert_sample_history_latency_wrong_entity) @@ -1853,6 +1860,14 @@ TEST_F(database_tests, insert_sample_network_latency) static_cast(sample)); ASSERT_EQ(participant->data.network_latency_per_locator[reader_locator->id][1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.network_latency_per_locator[reader_locator->id].size(), 2u); + ASSERT_EQ(participant->data.network_latency_per_locator[reader_locator->id][0], + static_cast(sample)); + ASSERT_EQ(participant->data.network_latency_per_locator[reader_locator->id][1], + static_cast(sample_2)); } TEST_F(database_tests, insert_sample_network_latency_wrong_entity) @@ -1878,6 +1893,12 @@ TEST_F(database_tests, insert_sample_publication_throughput) ASSERT_EQ(writer->data.publication_throughput.size(), 2u); ASSERT_EQ(writer->data.publication_throughput[0], static_cast(sample)); ASSERT_EQ(writer->data.publication_throughput[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.publication_throughput.size(), 2u); + ASSERT_EQ(writer->data.publication_throughput[0], static_cast(sample)); + ASSERT_EQ(writer->data.publication_throughput[1], static_cast(sample_2)); } TEST_F(database_tests, insert_sample_publication_throughput_wrong_entity) @@ -1902,6 +1923,12 @@ TEST_F(database_tests, insert_sample_subscription_throughput) ASSERT_EQ(reader->data.subscription_throughput.size(), 2u); ASSERT_EQ(reader->data.subscription_throughput[0], static_cast(sample)); ASSERT_EQ(reader->data.subscription_throughput[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); + ASSERT_EQ(reader->data.subscription_throughput.size(), 2u); + ASSERT_EQ(reader->data.subscription_throughput[0], static_cast(sample)); + ASSERT_EQ(reader->data.subscription_throughput[1], static_cast(sample_2)); } TEST_F(database_tests, insert_sample_subscription_throughput_wrong_entity) @@ -1932,6 +1959,14 @@ TEST_F(database_tests, insert_sample_rtps_packets_sent) static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(participant->data.last_reported_rtps_packets_sent_count[writer_locator->id].count, sample_2.count); + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.rtps_packets_sent.size(), 1u); + ASSERT_EQ(participant->data.rtps_packets_sent[writer_locator->id].size(), 2u); + ASSERT_EQ(participant->data.rtps_packets_sent[writer_locator->id][0], static_cast(sample)); + ASSERT_EQ(participant->data.rtps_packets_sent[writer_locator->id][1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_rtps_packets_sent_count[writer_locator->id].count, sample_2.count); } TEST_F(database_tests, insert_sample_rtps_packets_sent_wrong_entity) @@ -1983,6 +2018,17 @@ TEST_F(database_tests, insert_sample_rtps_bytes_sent) ASSERT_EQ(participant->data.last_reported_rtps_bytes_sent_count[writer_locator->id].magnitude_order, sample_2.magnitude_order); ASSERT_EQ(participant->data.last_reported_rtps_bytes_sent_count[writer_locator->id].count, sample_2.count); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.rtps_bytes_sent.size(), 1u); + ASSERT_EQ(participant->data.rtps_bytes_sent[writer_locator->id].size(), 2u); + ASSERT_EQ(participant->data.rtps_bytes_sent[writer_locator->id][0], static_cast(sample)); + ASSERT_EQ(participant->data.rtps_bytes_sent[writer_locator->id][1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_rtps_bytes_sent_count[writer_locator->id].magnitude_order, + sample_2.magnitude_order); + ASSERT_EQ(participant->data.last_reported_rtps_bytes_sent_count[writer_locator->id].count, sample_2.count); } TEST_F(database_tests, insert_sample_rtps_bytes_sent_wrong_entity) @@ -2033,6 +2079,15 @@ TEST_F(database_tests, insert_sample_rtps_packets_lost) ASSERT_EQ(participant->data.rtps_packets_lost[writer_locator->id][1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(participant->data.last_reported_rtps_packets_lost_count[writer_locator->id].count, sample_2.count); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.rtps_packets_lost.size(), 1u); + ASSERT_EQ(participant->data.rtps_packets_lost[writer_locator->id].size(), 2u); + ASSERT_EQ(participant->data.rtps_packets_lost[writer_locator->id][0], static_cast(sample)); + ASSERT_EQ(participant->data.rtps_packets_lost[writer_locator->id][1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_rtps_packets_lost_count[writer_locator->id].count, sample_2.count); } TEST_F(database_tests, insert_sample_rtps_packets_lost_wrong_entity) @@ -2084,6 +2139,17 @@ TEST_F(database_tests, insert_sample_rtps_bytes_lost) ASSERT_EQ(participant->data.last_reported_rtps_bytes_lost_count[writer_locator->id].magnitude_order, sample_2.magnitude_order); ASSERT_EQ(participant->data.last_reported_rtps_bytes_lost_count[writer_locator->id].count, sample_2.count); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.rtps_bytes_lost.size(), 1u); + ASSERT_EQ(participant->data.rtps_bytes_lost[writer_locator->id].size(), 2u); + ASSERT_EQ(participant->data.rtps_bytes_lost[writer_locator->id][0], static_cast(sample)); + ASSERT_EQ(participant->data.rtps_bytes_lost[writer_locator->id][1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_rtps_bytes_lost_count[writer_locator->id].magnitude_order, + sample_2.magnitude_order); + ASSERT_EQ(participant->data.last_reported_rtps_bytes_lost_count[writer_locator->id].count, sample_2.count); } TEST_F(database_tests, insert_sample_rtps_bytes_lost_wrong_entity) @@ -2131,6 +2197,14 @@ TEST_F(database_tests, insert_sample_resent_data) ASSERT_EQ(writer->data.resent_datas[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(writer->data.last_reported_resent_datas, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.resent_datas.size(), 2u); + ASSERT_EQ(writer->data.resent_datas[0], static_cast(sample)); + ASSERT_EQ(writer->data.resent_datas[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(writer->data.last_reported_resent_datas, sample_2); } TEST_F(database_tests, insert_sample_resent_data_wrong_entity) @@ -2157,6 +2231,14 @@ TEST_F(database_tests, insert_sample_heartbeat_count) ASSERT_EQ(writer->data.heartbeat_count[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(writer->data.last_reported_heartbeat_count, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.heartbeat_count.size(), 2u); + ASSERT_EQ(writer->data.heartbeat_count[0], static_cast(sample)); + ASSERT_EQ(writer->data.heartbeat_count[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(writer->data.last_reported_heartbeat_count, sample_2); } TEST_F(database_tests, insert_sample_heartbeat_count_wrong_entity) @@ -2183,6 +2265,14 @@ TEST_F(database_tests, insert_sample_acknack_count) ASSERT_EQ(reader->data.acknack_count[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(reader->data.last_reported_acknack_count, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); + ASSERT_EQ(reader->data.acknack_count.size(), 2u); + ASSERT_EQ(reader->data.acknack_count[0], static_cast(sample)); + ASSERT_EQ(reader->data.acknack_count[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(reader->data.last_reported_acknack_count, sample_2); } TEST_F(database_tests, insert_sample_acknack_count_wrong_entity) @@ -2209,6 +2299,14 @@ TEST_F(database_tests, insert_sample_nackfrag_count) ASSERT_EQ(reader->data.nackfrag_count[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(reader->data.last_reported_nackfrag_count, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); + ASSERT_EQ(reader->data.nackfrag_count.size(), 2u); + ASSERT_EQ(reader->data.nackfrag_count[0], static_cast(sample)); + ASSERT_EQ(reader->data.nackfrag_count[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(reader->data.last_reported_nackfrag_count, sample_2); } TEST_F(database_tests, insert_sample_nackfrag_count_wrong_entity) @@ -2235,6 +2333,14 @@ TEST_F(database_tests, insert_sample_gap_count) ASSERT_EQ(writer->data.gap_count[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(writer->data.last_reported_gap_count, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.gap_count.size(), 2u); + ASSERT_EQ(writer->data.gap_count[0], static_cast(sample)); + ASSERT_EQ(writer->data.gap_count[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(writer->data.last_reported_gap_count, sample_2); } TEST_F(database_tests, insert_sample_gap_count_wrong_entity) @@ -2261,6 +2367,14 @@ TEST_F(database_tests, insert_sample_data_count) ASSERT_EQ(writer->data.data_count[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(writer->data.last_reported_data_count, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->data.data_count.size(), 2u); + ASSERT_EQ(writer->data.data_count[0], static_cast(sample)); + ASSERT_EQ(writer->data.data_count[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(writer->data.last_reported_data_count, sample_2); } TEST_F(database_tests, insert_sample_data_count_wrong_entity) @@ -2287,6 +2401,14 @@ TEST_F(database_tests, insert_sample_pdp_packets) ASSERT_EQ(participant->data.pdp_packets[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(participant->data.last_reported_pdp_packets, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.pdp_packets.size(), 2u); + ASSERT_EQ(participant->data.pdp_packets[0], static_cast(sample)); + ASSERT_EQ(participant->data.pdp_packets[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_pdp_packets, sample_2); } TEST_F(database_tests, insert_sample_pdp_packets_wrong_entity) @@ -2313,6 +2435,14 @@ TEST_F(database_tests, insert_sample_edp_packets) ASSERT_EQ(participant->data.edp_packets[1], static_cast(sample_2) - static_cast(sample)); ASSERT_EQ(participant->data.last_reported_edp_packets, sample_2); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.edp_packets.size(), 2u); + ASSERT_EQ(participant->data.edp_packets[0], static_cast(sample)); + ASSERT_EQ(participant->data.edp_packets[1], + static_cast(sample_2) - static_cast(sample)); + ASSERT_EQ(participant->data.last_reported_edp_packets, sample_2); } TEST_F(database_tests, insert_sample_edp_packets_wrong_entity) @@ -2342,6 +2472,13 @@ TEST_F(database_tests, insert_sample_discovery_time) ASSERT_EQ(participant->data.discovered_entity[writer_id].size(), 2u); ASSERT_EQ(participant->data.discovered_entity[writer_id][0], static_cast(sample)); ASSERT_EQ(participant->data.discovered_entity[writer_id][1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->data.discovered_entity.size(), 1u); + ASSERT_EQ(participant->data.discovered_entity[writer_id].size(), 2u); + ASSERT_EQ(participant->data.discovered_entity[writer_id][0], static_cast(sample)); + ASSERT_EQ(participant->data.discovered_entity[writer_id][1], static_cast(sample_2)); } TEST_F(database_tests, insert_sample_discovery_time_wrong_entity) @@ -2381,6 +2518,12 @@ TEST_F(database_tests, insert_sample_sample_datas) ASSERT_EQ(writer->data.sample_datas.size(), 2u); ASSERT_EQ(writer->data.sample_datas[sample.sequence_number].size(), 1u); ASSERT_EQ(writer->data.sample_datas[sample.sequence_number][0], static_cast(sample_3)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_3)); + ASSERT_EQ(writer->data.sample_datas.size(), 2u); + ASSERT_EQ(writer->data.sample_datas[sample.sequence_number].size(), 1u); + ASSERT_EQ(writer->data.sample_datas[sample.sequence_number][0], static_cast(sample_3)); } TEST_F(database_tests, insert_sample_sample_datas_wrong_entity) @@ -2483,6 +2626,14 @@ TEST_F(database_tests, insert_monitor_service_sample_proxy) ASSERT_EQ(reader->monitor_service_data.proxy.size(), 1u); ASSERT_EQ(participant->monitor_service_data.proxy[0], static_cast(sample)); ASSERT_EQ(participant->monitor_service_data.proxy[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->monitor_service_data.proxy.size(), 2u); + ASSERT_EQ(writer->monitor_service_data.proxy.size(), 1u); + ASSERT_EQ(reader->monitor_service_data.proxy.size(), 1u); + ASSERT_EQ(participant->monitor_service_data.proxy[0], static_cast(sample)); + ASSERT_EQ(participant->monitor_service_data.proxy[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_proxy_wrong_entity) @@ -2565,6 +2716,14 @@ TEST_F(database_tests, insert_monitor_service_sample_connection_list) ASSERT_EQ(reader->monitor_service_data.connection_list.size(), 1u); ASSERT_EQ(participant->monitor_service_data.connection_list[0], static_cast(sample)); ASSERT_EQ(participant->monitor_service_data.connection_list[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, participant_id, sample_2)); + ASSERT_EQ(participant->monitor_service_data.connection_list.size(), 2u); + ASSERT_EQ(writer->monitor_service_data.connection_list.size(), 1u); + ASSERT_EQ(reader->monitor_service_data.connection_list.size(), 1u); + ASSERT_EQ(participant->monitor_service_data.connection_list[0], static_cast(sample)); + ASSERT_EQ(participant->monitor_service_data.connection_list[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_connection_list_wrong_entity) @@ -2640,6 +2799,13 @@ TEST_F(database_tests, insert_monitor_service_sample_incompatible_qos) ASSERT_EQ(reader->monitor_service_data.incompatible_qos.size(), 1u); ASSERT_EQ(writer->monitor_service_data.incompatible_qos[0], static_cast(sample)); ASSERT_EQ(writer->monitor_service_data.incompatible_qos[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->monitor_service_data.incompatible_qos.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.incompatible_qos.size(), 1u); + ASSERT_EQ(writer->monitor_service_data.incompatible_qos[0], static_cast(sample)); + ASSERT_EQ(writer->monitor_service_data.incompatible_qos[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_incompatible_qos_wrong_entity) @@ -2690,6 +2856,13 @@ TEST_F(database_tests, insert_monitor_service_sample_inconsistent_topic) ASSERT_EQ(reader->monitor_service_data.inconsistent_topic.size(), 1u); ASSERT_EQ(writer->monitor_service_data.inconsistent_topic[0], static_cast(sample)); ASSERT_EQ(writer->monitor_service_data.inconsistent_topic[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->monitor_service_data.inconsistent_topic.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.inconsistent_topic.size(), 1u); + ASSERT_EQ(writer->monitor_service_data.inconsistent_topic[0], static_cast(sample)); + ASSERT_EQ(writer->monitor_service_data.inconsistent_topic[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_inconsistent_topic_wrong_entity) @@ -2731,6 +2904,12 @@ TEST_F(database_tests, insert_monitor_service_sample_liveliness_lost) ASSERT_EQ(writer->monitor_service_data.liveliness_lost.size(), 2u); ASSERT_EQ(writer->monitor_service_data.liveliness_lost[0], static_cast(sample)); ASSERT_EQ(writer->monitor_service_data.liveliness_lost[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->monitor_service_data.liveliness_lost.size(), 2u); + ASSERT_EQ(writer->monitor_service_data.liveliness_lost[0], static_cast(sample)); + ASSERT_EQ(writer->monitor_service_data.liveliness_lost[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_liveliness_lost_wrong_entity) @@ -2776,6 +2955,12 @@ TEST_F(database_tests, insert_monitor_service_sample_liveliness_changed) ASSERT_EQ(reader->monitor_service_data.liveliness_changed.size(), 2u); ASSERT_EQ(reader->monitor_service_data.liveliness_changed[0], static_cast(sample)); ASSERT_EQ(reader->monitor_service_data.liveliness_changed[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); + ASSERT_EQ(reader->monitor_service_data.liveliness_changed.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.liveliness_changed[0], static_cast(sample)); + ASSERT_EQ(reader->monitor_service_data.liveliness_changed[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_liveliness_changed_wrong_entity) @@ -2823,6 +3008,13 @@ TEST_F(database_tests, insert_monitor_service_sample_deadline_missed) ASSERT_EQ(reader->monitor_service_data.deadline_missed.size(), 1u); ASSERT_EQ(writer->monitor_service_data.deadline_missed[0], static_cast(sample)); ASSERT_EQ(writer->monitor_service_data.deadline_missed[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->monitor_service_data.deadline_missed.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.deadline_missed.size(), 1u); + ASSERT_EQ(writer->monitor_service_data.deadline_missed[0], static_cast(sample)); + ASSERT_EQ(writer->monitor_service_data.deadline_missed[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_deadline_missed_wrong_entity) @@ -2865,6 +3057,12 @@ TEST_F(database_tests, insert_monitor_service_sample_sample_lost) ASSERT_EQ(reader->monitor_service_data.sample_lost.size(), 2u); ASSERT_EQ(reader->monitor_service_data.sample_lost[0], static_cast(sample)); ASSERT_EQ(reader->monitor_service_data.sample_lost[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, reader_id, sample_2)); + ASSERT_EQ(reader->monitor_service_data.sample_lost.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.sample_lost[0], static_cast(sample)); + ASSERT_EQ(reader->monitor_service_data.sample_lost[1], static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_sample_lost_wrong_entity) From 55397ee9909a1fef9d6e708d169d6337ff3ebe2e Mon Sep 17 00:00:00 2001 From: Carlosespicur Date: Tue, 10 Dec 2024 10:28:32 +0100 Subject: [PATCH 4/4] Refs #20256: Apply changes to ExtendedIncompatibleQos samples & fix errors Signed-off-by: Carlosespicur --- src/cpp/database/database.cpp | 20 +++++++++++++++++++- test/unittest/Database/DatabaseTests.cpp | 11 ++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/cpp/database/database.cpp b/src/cpp/database/database.cpp index e48f574f..77eda546 100644 --- a/src/cpp/database/database.cpp +++ b/src/cpp/database/database.cpp @@ -1812,7 +1812,7 @@ bool Database::insert_nts( std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); // Reject samples with old timestamps - if (datawriter->monitor_service_data.proxy.empty() && + if (!datawriter->monitor_service_data.proxy.empty() && proxy.src_ts <= datawriter->monitor_service_data.proxy.back().src_ts) { break; @@ -2126,6 +2126,15 @@ bool Database::insert_nts( std::shared_ptr const_datareader = std::dynamic_pointer_cast( entity); std::shared_ptr datareader = std::const_pointer_cast(const_datareader); + + // Reject samples with old timestamps + if (!datareader->monitor_service_data.extended_incompatible_qos.empty() && + extended_incompatible_qos.src_ts <= + datareader->monitor_service_data.extended_incompatible_qos.back().src_ts) + { + break; + } + datareader->monitor_service_data.extended_incompatible_qos.push_back(extended_incompatible_qos); entity_updated = update_entity_status_nts(datareader); break; @@ -2135,6 +2144,15 @@ bool Database::insert_nts( std::shared_ptr const_datawriter = std::dynamic_pointer_cast( entity); std::shared_ptr datawriter = std::const_pointer_cast(const_datawriter); + + // Reject samples with old timestamps + if (!datawriter->monitor_service_data.extended_incompatible_qos.empty() && + extended_incompatible_qos.src_ts <= + datawriter->monitor_service_data.extended_incompatible_qos.back().src_ts) + { + break; + } + datawriter->monitor_service_data.extended_incompatible_qos.push_back(extended_incompatible_qos); entity_updated = update_entity_status_nts(datawriter); break; diff --git a/test/unittest/Database/DatabaseTests.cpp b/test/unittest/Database/DatabaseTests.cpp index fda40b0a..0909697b 100644 --- a/test/unittest/Database/DatabaseTests.cpp +++ b/test/unittest/Database/DatabaseTests.cpp @@ -3108,7 +3108,7 @@ TEST_F(database_tests, insert_monitor_service_sample_extended_incompatible_qos) ExtendedIncompatibleQosSample sample_2; sample_2.kind = StatusKind::EXTENDED_INCOMPATIBLE_QOS; sample_2.status = StatusLevel::ERROR_STATUS; - sample_2.src_ts = std::chrono::system_clock::now(); + sample_2.src_ts = std::chrono::system_clock::now() + std::chrono::seconds(1); status.current_incompatible_policies(std::vector{1, 2}); sample_2.extended_incompatible_qos_status = {status}; ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); @@ -3122,6 +3122,15 @@ TEST_F(database_tests, insert_monitor_service_sample_extended_incompatible_qos) static_cast(sample)); ASSERT_EQ(writer->monitor_service_data.extended_incompatible_qos[1], static_cast(sample_2)); + + // Insert old sample - should not be inserted + ASSERT_NO_THROW(db.insert(domain_id, writer_id, sample_2)); + ASSERT_EQ(writer->monitor_service_data.extended_incompatible_qos.size(), 2u); + ASSERT_EQ(reader->monitor_service_data.extended_incompatible_qos.size(), 1u); + ASSERT_EQ(writer->monitor_service_data.extended_incompatible_qos[0], + static_cast(sample)); + ASSERT_EQ(writer->monitor_service_data.extended_incompatible_qos[1], + static_cast(sample_2)); } TEST_F(database_tests, insert_monitor_service_sample_extended_incompatible_qos_wrong_entity)