@@ -123,6 +123,7 @@ class IntraProcessBuffer : public IntraProcessBufferBase
123
123
return result;
124
124
}
125
125
126
+ private:
126
127
// need to store the messages somewhere otherwise the memory address will be reused
127
128
ConstMessageSharedPtr shared_msg;
128
129
MessageUniquePtr unique_msg;
@@ -158,9 +159,9 @@ class PublisherBase
158
159
public:
159
160
RCLCPP_SMART_PTR_DEFINITIONS (PublisherBase)
160
161
161
- explicit PublisherBase (rclcpp::QoS qos = rclcpp::QoS( 10 ) )
162
- : qos_profile(qos ),
163
- topic_name( " topic " )
162
+ explicit PublisherBase (const std::string & topic, const rclcpp::QoS & qos )
163
+ : topic_name(topic ),
164
+ qos_profile(qos )
164
165
{}
165
166
166
167
virtual ~PublisherBase ()
@@ -205,10 +206,12 @@ class PublisherBase
205
206
return false ;
206
207
}
207
208
208
- rclcpp::QoS qos_profile;
209
- std::string topic_name;
210
209
uint64_t intra_process_publisher_id_;
211
210
IntraProcessManagerWeakPtr weak_ipm_;
211
+
212
+ private:
213
+ std::string topic_name;
214
+ rclcpp::QoS qos_profile;
212
215
};
213
216
214
217
template <typename T, typename Alloc = std::allocator<void >>
@@ -223,8 +226,8 @@ class Publisher : public PublisherBase
223
226
224
227
RCLCPP_SMART_PTR_DEFINITIONS (Publisher<T, Alloc>)
225
228
226
- explicit Publisher (rclcpp::QoS qos = rclcpp::QoS( 10 ) )
227
- : PublisherBase(qos)
229
+ explicit Publisher (const std::string & topic, const rclcpp::QoS & qos )
230
+ : PublisherBase(topic, qos)
228
231
{
229
232
auto allocator = std::make_shared<Alloc>();
230
233
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get ());
@@ -258,9 +261,9 @@ class SubscriptionIntraProcessBase
258
261
259
262
explicit SubscriptionIntraProcessBase (
260
263
rclcpp::Context::SharedPtr context,
261
- const std::string & topic = " topic " ,
262
- rclcpp::QoS qos = rclcpp::QoS( 10 ) )
263
- : qos_profile(qos ), topic_name(topic )
264
+ const std::string & topic,
265
+ const rclcpp::QoS & qos )
266
+ : topic_name(topic ), qos_profile(qos )
264
267
{
265
268
(void )context;
266
269
}
@@ -292,8 +295,8 @@ class SubscriptionIntraProcessBase
292
295
size_t
293
296
available_capacity () const = 0 ;
294
297
295
- rclcpp::QoS qos_profile;
296
298
std::string topic_name;
299
+ rclcpp::QoS qos_profile;
297
300
};
298
301
299
302
template <
@@ -307,8 +310,8 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
307
310
public:
308
311
RCLCPP_SMART_PTR_DEFINITIONS (SubscriptionIntraProcessBuffer)
309
312
310
- explicit SubscriptionIntraProcessBuffer (rclcpp::QoS qos)
311
- : SubscriptionIntraProcessBase(nullptr , " topic" , qos), take_shared_method(false )
313
+ explicit SubscriptionIntraProcessBuffer (const std::string & topic, const rclcpp::QoS & qos)
314
+ : SubscriptionIntraProcessBase(nullptr , topic, qos), take_shared_method(false )
312
315
{
313
316
buffer = std::make_unique<rclcpp::experimental::buffers::mock::IntraProcessBuffer<MessageT>>();
314
317
}
@@ -375,8 +378,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer<
375
378
public:
376
379
RCLCPP_SMART_PTR_DEFINITIONS (SubscriptionIntraProcess)
377
380
378
- explicit SubscriptionIntraProcess (rclcpp::QoS qos = rclcpp::QoS( 10 ) )
379
- : SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(qos)
381
+ explicit SubscriptionIntraProcess (const std::string & topic, const rclcpp::QoS & qos )
382
+ : SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(topic, qos)
380
383
{
381
384
}
382
385
};
@@ -466,12 +469,11 @@ TEST(TestIntraProcessManager, add_pub_sub) {
466
469
467
470
auto ipm = std::make_shared<IntraProcessManagerT>();
468
471
469
- auto p1 = std::make_shared<PublisherT>(rclcpp::QoS (10 ).best_effort ());
472
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS (10 ).best_effort ());
470
473
471
- auto p2 = std::make_shared<PublisherT>(rclcpp::QoS (10 ).best_effort ());
472
- p2->topic_name = " different_topic_name" ;
474
+ auto p2 = std::make_shared<PublisherT>(" different_topic_name" , rclcpp::QoS (10 ).best_effort ());
473
475
474
- auto s1 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS (10 ).best_effort ());
476
+ auto s1 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS (10 ).best_effort ());
475
477
476
478
auto p1_id = ipm->add_publisher (p1);
477
479
auto p2_id = ipm->add_publisher (p2);
@@ -480,24 +482,42 @@ TEST(TestIntraProcessManager, add_pub_sub) {
480
482
bool unique_ids = p1_id != p2_id && p2_id != s1_id;
481
483
ASSERT_TRUE (unique_ids);
482
484
485
+ // p1 has 1 subcription, s1
483
486
size_t p1_subs = ipm->get_subscription_count (p1_id);
487
+ // p2 has 0 subscriptions
484
488
size_t p2_subs = ipm->get_subscription_count (p2_id);
489
+ // Non-existent publisher_id has 0 subscriptions
485
490
size_t non_existing_pub_subs = ipm->get_subscription_count (42 );
486
491
ASSERT_EQ (1u , p1_subs);
487
492
ASSERT_EQ (0u , p2_subs);
488
493
ASSERT_EQ (0u , non_existing_pub_subs);
489
494
490
- auto p3 = std::make_shared<PublisherT>(rclcpp::QoS (10 ).reliable ());
495
+ auto p3 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS (10 ).reliable ());
491
496
492
- auto s2 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS (10 ).reliable ());
497
+ auto s2 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS (10 ).reliable ());
493
498
499
+ // s2 may be able to communicate with p1 depending on the RMW
494
500
auto s2_id = ipm->template add_subscription <MessageT>(s2);
501
+ // p3 can definitely communicate with s2, may be able to communicate with s1 depending on the RMW
495
502
auto p3_id = ipm->add_publisher (p3);
496
503
504
+ // p1 definitely matches subscription s1, since the topic name and QoS match exactly.
505
+ // If the RMW can match best-effort publishers to reliable subscriptions (like Zenoh can),
506
+ // then p1 will also match s2.
497
507
p1_subs = ipm->get_subscription_count (p1_id);
508
+ // No subscriptions with a topic name of "different_topic_name" were added.
498
509
p2_subs = ipm->get_subscription_count (p2_id);
510
+ // On all current RMWs (DDS and Zenoh), a reliable publisher like p3 can communicate with both
511
+ // reliable and best-effort subscriptions (s1 and s2).
499
512
size_t p3_subs = ipm->get_subscription_count (p3_id);
500
- ASSERT_EQ (1u , p1_subs);
513
+
514
+ rclcpp::QoSCheckCompatibleResult qos_compatible =
515
+ rclcpp::qos_check_compatible (p1->get_actual_qos (), s2->get_actual_qos ());
516
+ if (qos_compatible.compatibility == rclcpp::QoSCompatibility::Error) {
517
+ ASSERT_EQ (1u , p1_subs);
518
+ } else {
519
+ ASSERT_EQ (2u , p1_subs);
520
+ }
501
521
ASSERT_EQ (0u , p2_subs);
502
522
ASSERT_EQ (2u , p3_subs);
503
523
@@ -528,11 +548,11 @@ TEST(TestIntraProcessManager, single_subscription) {
528
548
529
549
auto ipm = std::make_shared<IntraProcessManagerT>();
530
550
531
- auto p1 = std::make_shared<PublisherT>();
551
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS ( 10 ) );
532
552
auto p1_id = ipm->add_publisher (p1);
533
553
p1->set_intra_process_manager (p1_id, ipm);
534
554
535
- auto s1 = std::make_shared<SubscriptionIntraProcessT>();
555
+ auto s1 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
536
556
s1->take_shared_method = false ;
537
557
auto s1_id = ipm->template add_subscription <MessageT>(s1);
538
558
@@ -543,7 +563,7 @@ TEST(TestIntraProcessManager, single_subscription) {
543
563
ASSERT_EQ (original_message_pointer, received_message_pointer_1);
544
564
545
565
ipm->remove_subscription (s1_id);
546
- auto s2 = std::make_shared<SubscriptionIntraProcessT>();
566
+ auto s2 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
547
567
s2->take_shared_method = true ;
548
568
auto s2_id = ipm->template add_subscription <MessageT>(s2);
549
569
(void )s2_id;
@@ -582,15 +602,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) {
582
602
583
603
auto ipm = std::make_shared<IntraProcessManagerT>();
584
604
585
- auto p1 = std::make_shared<PublisherT>();
605
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS ( 10 ) );
586
606
auto p1_id = ipm->add_publisher (p1);
587
607
p1->set_intra_process_manager (p1_id, ipm);
588
608
589
- auto s1 = std::make_shared<SubscriptionIntraProcessT>();
609
+ auto s1 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
590
610
s1->take_shared_method = false ;
591
611
auto s1_id = ipm->template add_subscription <MessageT>(s1);
592
612
593
- auto s2 = std::make_shared<SubscriptionIntraProcessT>();
613
+ auto s2 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
594
614
s2->take_shared_method = false ;
595
615
auto s2_id = ipm->template add_subscription <MessageT>(s2);
596
616
@@ -606,11 +626,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) {
606
626
ipm->remove_subscription (s1_id);
607
627
ipm->remove_subscription (s2_id);
608
628
609
- auto s3 = std::make_shared<SubscriptionIntraProcessT>();
629
+ auto s3 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
610
630
s3->take_shared_method = true ;
611
631
auto s3_id = ipm->template add_subscription <MessageT>(s3);
612
632
613
- auto s4 = std::make_shared<SubscriptionIntraProcessT>();
633
+ auto s4 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
614
634
s4->take_shared_method = true ;
615
635
auto s4_id = ipm->template add_subscription <MessageT>(s4);
616
636
@@ -625,11 +645,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) {
625
645
ipm->remove_subscription (s3_id);
626
646
ipm->remove_subscription (s4_id);
627
647
628
- auto s5 = std::make_shared<SubscriptionIntraProcessT>();
648
+ auto s5 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
629
649
s5->take_shared_method = false ;
630
650
auto s5_id = ipm->template add_subscription <MessageT>(s5);
631
651
632
- auto s6 = std::make_shared<SubscriptionIntraProcessT>();
652
+ auto s6 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
633
653
s6->take_shared_method = false ;
634
654
auto s6_id = ipm->template add_subscription <MessageT>(s6);
635
655
@@ -645,12 +665,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) {
645
665
ipm->remove_subscription (s5_id);
646
666
ipm->remove_subscription (s6_id);
647
667
648
- auto s7 = std::make_shared<SubscriptionIntraProcessT>();
668
+ auto s7 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
649
669
s7->take_shared_method = true ;
650
670
auto s7_id = ipm->template add_subscription <MessageT>(s7);
651
671
(void )s7_id;
652
672
653
- auto s8 = std::make_shared<SubscriptionIntraProcessT>();
673
+ auto s8 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
654
674
s8->take_shared_method = true ;
655
675
auto s8_id = ipm->template add_subscription <MessageT>(s8);
656
676
(void )s8_id;
@@ -688,15 +708,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
688
708
689
709
auto ipm = std::make_shared<IntraProcessManagerT>();
690
710
691
- auto p1 = std::make_shared<PublisherT>();
711
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS ( 10 ) );
692
712
auto p1_id = ipm->add_publisher (p1);
693
713
p1->set_intra_process_manager (p1_id, ipm);
694
714
695
- auto s1 = std::make_shared<SubscriptionIntraProcessT>();
715
+ auto s1 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
696
716
s1->take_shared_method = true ;
697
717
auto s1_id = ipm->template add_subscription <MessageT>(s1);
698
718
699
- auto s2 = std::make_shared<SubscriptionIntraProcessT>();
719
+ auto s2 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
700
720
s2->take_shared_method = false ;
701
721
auto s2_id = ipm->template add_subscription <MessageT>(s2);
702
722
@@ -711,15 +731,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
711
731
ipm->remove_subscription (s1_id);
712
732
ipm->remove_subscription (s2_id);
713
733
714
- auto s3 = std::make_shared<SubscriptionIntraProcessT>();
734
+ auto s3 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
715
735
s3->take_shared_method = false ;
716
736
auto s3_id = ipm->template add_subscription <MessageT>(s3);
717
737
718
- auto s4 = std::make_shared<SubscriptionIntraProcessT>();
738
+ auto s4 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
719
739
s4->take_shared_method = false ;
720
740
auto s4_id = ipm->template add_subscription <MessageT>(s4);
721
741
722
- auto s5 = std::make_shared<SubscriptionIntraProcessT>();
742
+ auto s5 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
723
743
s5->take_shared_method = true ;
724
744
auto s5_id = ipm->template add_subscription <MessageT>(s5);
725
745
@@ -743,19 +763,19 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
743
763
ipm->remove_subscription (s4_id);
744
764
ipm->remove_subscription (s5_id);
745
765
746
- auto s6 = std::make_shared<SubscriptionIntraProcessT>();
766
+ auto s6 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
747
767
s6->take_shared_method = true ;
748
768
auto s6_id = ipm->template add_subscription <MessageT>(s6);
749
769
750
- auto s7 = std::make_shared<SubscriptionIntraProcessT>();
770
+ auto s7 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
751
771
s7->take_shared_method = true ;
752
772
auto s7_id = ipm->template add_subscription <MessageT>(s7);
753
773
754
- auto s8 = std::make_shared<SubscriptionIntraProcessT>();
774
+ auto s8 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
755
775
s8->take_shared_method = false ;
756
776
auto s8_id = ipm->template add_subscription <MessageT>(s8);
757
777
758
- auto s9 = std::make_shared<SubscriptionIntraProcessT>();
778
+ auto s9 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
759
779
s9->take_shared_method = false ;
760
780
auto s9_id = ipm->template add_subscription <MessageT>(s9);
761
781
@@ -781,12 +801,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
781
801
ipm->remove_subscription (s8_id);
782
802
ipm->remove_subscription (s9_id);
783
803
784
- auto s10 = std::make_shared<SubscriptionIntraProcessT>();
804
+ auto s10 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
785
805
s10->take_shared_method = false ;
786
806
auto s10_id = ipm->template add_subscription <MessageT>(s10);
787
807
(void )s10_id;
788
808
789
- auto s11 = std::make_shared<SubscriptionIntraProcessT>();
809
+ auto s11 = std::make_shared<SubscriptionIntraProcessT>(" topic " , rclcpp::QoS ( 10 ) );
790
810
s11->take_shared_method = true ;
791
811
auto s11_id = ipm->template add_subscription <MessageT>(s11);
792
812
(void )s11_id;
@@ -831,10 +851,12 @@ TEST(TestIntraProcessManager, lowest_available_capacity) {
831
851
832
852
auto ipm = std::make_shared<IntraProcessManagerT>();
833
853
834
- auto p1 = std::make_shared<PublisherT>(rclcpp::QoS (history_depth).best_effort ());
854
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS (history_depth).best_effort ());
835
855
836
- auto s1 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS (history_depth).best_effort ());
837
- auto s2 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS (history_depth).best_effort ());
856
+ auto s1 =
857
+ std::make_shared<SubscriptionIntraProcessT>(" topic" , rclcpp::QoS (history_depth).best_effort ());
858
+ auto s2 =
859
+ std::make_shared<SubscriptionIntraProcessT>(" topic" , rclcpp::QoS (history_depth).best_effort ());
838
860
839
861
auto p1_id = ipm->add_publisher (p1);
840
862
p1->set_intra_process_manager (p1_id, ipm);
@@ -902,7 +924,7 @@ TEST(TestIntraProcessManager, transient_local_invalid_buffer) {
902
924
903
925
auto ipm = std::make_shared<IntraProcessManagerT>();
904
926
905
- auto p1 = std::make_shared<PublisherT>(rclcpp::QoS (history_depth).transient_local ());
927
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS (history_depth).transient_local ());
906
928
907
929
ASSERT_THROW (
908
930
{
@@ -926,14 +948,14 @@ TEST(TestIntraProcessManager, transient_local) {
926
948
927
949
auto ipm = std::make_shared<IntraProcessManagerT>();
928
950
929
- auto p1 = std::make_shared<PublisherT>(rclcpp::QoS (history_depth).transient_local ());
951
+ auto p1 = std::make_shared<PublisherT>(" topic " , rclcpp::QoS (history_depth).transient_local ());
930
952
931
- auto s1 =
932
- std::make_shared<SubscriptionIntraProcessT>( rclcpp::QoS (history_depth).transient_local ());
933
- auto s2 =
934
- std::make_shared<SubscriptionIntraProcessT>( rclcpp::QoS (history_depth).transient_local ());
935
- auto s3 =
936
- std::make_shared<SubscriptionIntraProcessT>( rclcpp::QoS (history_depth).transient_local ());
953
+ auto s1 = std::make_shared<SubscriptionIntraProcessT>(
954
+ " topic " , rclcpp::QoS (history_depth).transient_local ());
955
+ auto s2 = std::make_shared<SubscriptionIntraProcessT>(
956
+ " topic " , rclcpp::QoS (history_depth).transient_local ());
957
+ auto s3 = std::make_shared<SubscriptionIntraProcessT>(
958
+ " topic " , rclcpp::QoS (history_depth).transient_local ());
937
959
938
960
s1->take_shared_method = false ;
939
961
s2->take_shared_method = true ;
0 commit comments