Skip to content

Commit 90c3d40

Browse files
committed
Squash Rebase P2P
Adjust pub sub test to only one call to rmw_wait Refactor entities creation XML and REF Added entity creation buffer pointer Added entity creation buffer pointer Updates Fix Updates Updates Updated CI Typos corrections
1 parent 9e27b72 commit 90c3d40

File tree

15 files changed

+242
-114
lines changed

15 files changed

+242
-114
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ jobs:
1919
- name: Download dependencies
2020
run: |
2121
git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR
22-
git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
22+
git clone -b feature/brokerless_p2p https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
2323
git clone -b foxy https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds
2424
touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE
2525
2626
- name: Build
27-
run: . /opt/ros/foxy/setup.sh && colcon build
27+
run: . /opt/ros/foxy/setup.sh && colcon build --cmake-args -DUCLIENT_MAX_SESSION_CONNECTION_ATTEMPTS=1
2828

2929
- name: Test
3030
run: |

rmw_microxrcedds_c/src/callbacks.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ void on_topic(
4949
if ((custom_subscription->datareader_id.id == object_id.id) &&
5050
(custom_subscription->datareader_id.type == object_id.type))
5151
{
52-
custom_subscription->micro_buffer_lenght[custom_subscription->history_write_index] = length;
52+
custom_subscription->micro_buffer_length[custom_subscription->history_write_index] = length;
5353
ucdr_deserialize_array_uint8_t(
5454
ub,
5555
custom_subscription->micro_buffer[custom_subscription->history_write_index], length);
@@ -89,7 +89,7 @@ void on_request(
8989
while (service_item != NULL) {
9090
rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service_item->data;
9191
if (custom_service->request_id == request_id) {
92-
custom_service->micro_buffer_lenght[custom_service->history_write_index] = length;
92+
custom_service->micro_buffer_length[custom_service->history_write_index] = length;
9393
ucdr_deserialize_array_uint8_t(
9494
ub,
9595
custom_service->micro_buffer[custom_service->history_write_index], length);
@@ -132,7 +132,7 @@ void on_reply(
132132
while (client_item != NULL) {
133133
rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client_item->data;
134134
if (custom_client->request_id == request_id) {
135-
custom_client->micro_buffer_lenght[custom_client->history_write_index] = length;
135+
custom_client->micro_buffer_length[custom_client->history_write_index] = length;
136136
ucdr_deserialize_array_uint8_t(
137137
ub,
138138
custom_client->micro_buffer[custom_client->history_write_index], length);

rmw_microxrcedds_c/src/rmw_client.c

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,24 @@ rmw_create_client(
127127
}
128128
client_req = uxr_buffer_create_requester_xml(
129129
&custom_node->context->session,
130-
custom_node->context->reliable_output, custom_client->client_id,
131-
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
130+
*custom_node->context->entity_creation_output,
131+
custom_client->client_id,
132+
custom_node->participant_id,
133+
rmw_uxrce_xml_buffer,
134+
UXR_REPLACE);
132135
#elif defined(MICRO_XRCEDDS_USE_REFS)
133-
// TODO(pablogs9): Is possible to instantiate a replier by ref?
134-
// client_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
135-
// custom_node->context->reliable_output, custom_service->subscriber_id,
136-
// custom_node->participant_id, "", UXR_REPLACE);
136+
if (!build_requester_profile(service_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
137+
RMW_SET_ERROR_MSG("failed to generate ref request for client creation");
138+
goto fail;
139+
}
140+
141+
client_req = uxr_buffer_create_requester_ref(
142+
&custom_node->context->session,
143+
*custom_node->context->entity_creation_output,
144+
custom_client->client_id,
145+
custom_node->participant_id,
146+
rmw_uxrce_profile_name,
147+
UXR_REPLACE);
137148
#endif
138149

139150
rmw_client->data = custom_client;
@@ -144,7 +155,7 @@ rmw_create_client(
144155
&custom_node->context->session, 1000, requests,
145156
status, 1))
146157
{
147-
RMW_SET_ERROR_MSG("Issues creating Micro XRCE-DDS entities");
158+
RMW_SET_ERROR_MSG("Issues creating Requester Micro XRCE-DDS entities");
148159
put_memory(&client_memory, &custom_client->mem);
149160
goto fail;
150161
}
@@ -162,8 +173,10 @@ rmw_create_client(
162173

163174
custom_client->request_id = uxr_buffer_request_data(
164175
&custom_node->context->session,
165-
custom_node->context->reliable_output, custom_client->client_id,
166-
custom_client->stream_id, &delivery_control);
176+
*custom_node->context->entity_creation_output,
177+
custom_client->client_id,
178+
custom_client->stream_id,
179+
&delivery_control);
167180
}
168181
return rmw_client;
169182

@@ -203,7 +216,8 @@ rmw_destroy_client(
203216
rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client->data;
204217
uint16_t delete_client =
205218
uxr_buffer_delete_entity(
206-
&custom_node->context->session, custom_node->context->reliable_output,
219+
&custom_node->context->session,
220+
*custom_node->context->entity_creation_output,
207221
custom_client->client_id);
208222

209223
uint16_t requests[] = {delete_client};
@@ -212,7 +226,7 @@ rmw_destroy_client(
212226
&custom_node->context->session, 1000, requests, status,
213227
sizeof(status)))
214228
{
215-
RMW_SET_ERROR_MSG("unable to remove client from the server");
229+
RMW_SET_ERROR_MSG("Unable to remove client from the server");
216230
result_ret = RMW_RET_ERROR;
217231
} else {
218232
rmw_uxrce_fini_client_memory(client);

rmw_microxrcedds_c/src/rmw_init.c

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@
3333
#include <termios.h>
3434
#endif
3535

36+
#ifdef UCLIENT_BROKERLESS_ENABLE
37+
#include <uxr/client/brokerless/brokerless.h>
38+
#endif
39+
3640
#if defined(MICRO_XRCEDDS_SERIAL) || defined(MICRO_XRCEDDS_CUSTOM_SERIAL)
3741
#define CLOSE_TRANSPORT(transport) uxr_close_serial_transport(transport)
38-
#elif defined(MICRO_XRCEDDS_UDP)
42+
#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_BROKERLESS_ENABLE)
3943
#define CLOSE_TRANSPORT(transport) uxr_close_udp_transport(transport)
4044
#else
4145
#define CLOSE_TRANSPORT(transport)
@@ -245,8 +249,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
245249
}
246250
printf("Serial mode => dev: %s\n", context_impl->connection_params.serial_device);
247251

248-
#elif defined(MICRO_XRCEDDS_UDP)
249-
// TODO(Borja) Think how we are going to select transport to use
252+
#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_BROKERLESS_ENABLE)
250253
#ifdef MICRO_XRCEDDS_IPV4
251254
uxrIpProtocol ip_protocol = UXR_IPv4;
252255
#elif defined(MICRO_XRCEDDS_IPV6)
@@ -263,6 +266,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
263266
printf(
264267
"UDP mode => ip: %s - port: %s\n", context_impl->connection_params.agent_address,
265268
context_impl->connection_params.agent_port);
269+
#elif defined(MICRO_XRCEDDS_UDP) && defined(UCLIENT_BROKERLESS_ENABLE)
270+
271+
context_impl->transport.comm = brokerless_comm_stub;
272+
273+
printf("UDP mode => Brokerless P2P\n");
266274
#elif defined(MICRO_XRCEDDS_CUSTOM_SERIAL)
267275
int pseudo_fd = 0;
268276
if (strlen(options->impl->connection_params.serial_device) > 0) {
@@ -289,23 +297,26 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
289297

290298
context_impl->reliable_input = uxr_create_input_reliable_stream(
291299
&context_impl->session, context_impl->input_reliable_stream_buffer,
292-
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY);
293-
context_impl->reliable_output =
294-
uxr_create_output_reliable_stream(
300+
RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY);
301+
context_impl->reliable_output = uxr_create_output_reliable_stream(
295302
&context_impl->session, context_impl->output_reliable_stream_buffer,
296-
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY);
303+
RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY);
297304

298305
context_impl->best_effort_input = uxr_create_input_best_effort_stream(&context_impl->session);
299306
context_impl->best_effort_output = uxr_create_output_best_effort_stream(
300307
&context_impl->session,
301-
context_impl->output_best_effort_stream_buffer, context_impl->transport.comm.mtu);
302-
308+
context_impl->output_best_effort_stream_buffer, RMW_UXRCE_MAX_TRANSPORT_MTU);
303309

310+
#ifndef UCLIENT_BROKERLESS_ENABLE
311+
context_impl->entity_creation_output = &context_impl->reliable_output;
304312
if (!uxr_create_session(&context_impl->session)) {
305313
CLOSE_TRANSPORT(&context_impl->transport);
306314
RMW_SET_ERROR_MSG("failed to create node session on Micro ROS Agent.");
307315
return RMW_RET_ERROR;
308316
}
317+
#else
318+
context_impl->entity_creation_output = &context_impl->best_effort_output;
319+
#endif
309320

310321
return RMW_RET_OK;
311322
}

rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include "./utils.h"
2323

24-
2524
rmw_uxrce_topic_t *
2625
create_topic(
2726
struct rmw_uxrce_node_t * custom_node,
@@ -34,7 +33,6 @@ create_topic(
3433
RMW_SET_ERROR_MSG("Not available memory node");
3534
goto fail;
3635
}
37-
3836
rmw_uxrce_topic_t * custom_topic = (rmw_uxrce_topic_t *)memory_node->data;
3937

4038

@@ -63,8 +61,11 @@ create_topic(
6361

6462
topic_req = uxr_buffer_create_topic_xml(
6563
&custom_node->context->session,
66-
custom_node->context->reliable_output, custom_topic->topic_id,
67-
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
64+
*custom_node->context->entity_creation_output,
65+
custom_topic->topic_id,
66+
custom_node->participant_id,
67+
rmw_uxrce_xml_buffer,
68+
UXR_REPLACE);
6869
#elif defined(MICRO_XRCEDDS_USE_REFS)
6970
(void)qos_policies;
7071
if (!build_topic_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
@@ -76,18 +77,23 @@ create_topic(
7677

7778
topic_req = uxr_buffer_create_topic_ref(
7879
&custom_node->context->session,
79-
custom_node->context->reliable_output, custom_topic->topic_id,
80-
custom_node->participant_id, rmw_uxrce_profile_name, UXR_REPLACE);
80+
*custom_node->context->entity_creation_output,
81+
custom_topic->topic_id,
82+
custom_node->participant_id,
83+
rmw_uxrce_profile_name,
84+
UXR_REPLACE);
8185
#endif
8286

8387
// Send the request and wait for response
8488
uint8_t status;
85-
custom_topic->sync_with_agent =
86-
uxr_run_session_until_all_status(
87-
&custom_node->context->session, 1000, &topic_req,
88-
&status, 1);
89-
if (!custom_topic->sync_with_agent) {
90-
RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities");
89+
90+
if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type)
91+
{
92+
uxr_flash_output_streams(&custom_node->context->session);
93+
}
94+
else if(!(custom_topic->sync_with_agent = uxr_run_session_until_all_status(&custom_node->context->session, 1000, &topic_req, &status, 1)))
95+
{
96+
RMW_SET_ERROR_MSG("Issues creating Topic Micro XRCE-DDS entities");
9197
rmw_uxrce_fini_topic_memory(custom_topic);
9298
custom_topic = NULL;
9399
goto fail;
@@ -103,16 +109,22 @@ rmw_ret_t destroy_topic(rmw_uxrce_topic_t * topic)
103109

104110
uint16_t delete_topic = uxr_buffer_delete_entity(
105111
&topic->owner_node->context->session,
106-
topic->owner_node->context->reliable_output,
112+
*topic->owner_node->context->entity_creation_output,
107113
topic->topic_id);
108114

109115
uint16_t requests[] = {delete_topic};
110116
uint8_t status[1];
111-
if (!uxr_run_session_until_all_status(
117+
118+
if (UXR_BEST_EFFORT_STREAM == topic->owner_node->context->entity_creation_output->type)
119+
{
120+
uxr_flash_output_streams(&topic->owner_node->context->session);
121+
rmw_uxrce_fini_topic_memory(topic);
122+
}
123+
else if (!uxr_run_session_until_all_status(
112124
&topic->owner_node->context->session, 1000, requests,
113125
status, 1))
114126
{
115-
RMW_SET_ERROR_MSG("unable to remove topic from the server");
127+
RMW_SET_ERROR_MSG("Unable to remove topic from the server");
116128
result_ret = RMW_RET_ERROR;
117129
} else {
118130
rmw_uxrce_fini_topic_memory(topic);

rmw_microxrcedds_c/src/rmw_node.c

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,11 @@ rmw_node_t * create_node(
8585
participant_req =
8686
uxr_buffer_create_participant_xml(
8787
&node_info->context->session,
88-
node_info->context->reliable_output,
89-
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
88+
*node_info->context->entity_creation_output,
89+
node_info->participant_id,
90+
(uint16_t)domain_id,
91+
rmw_uxrce_xml_buffer,
92+
UXR_REPLACE);
9093
#elif defined(MICRO_XRCEDDS_USE_REFS)
9194
if (!build_participant_profile(rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
9295
RMW_SET_ERROR_MSG("failed to generate xml request for node creation");
@@ -95,15 +98,24 @@ rmw_node_t * create_node(
9598
participant_req =
9699
uxr_buffer_create_participant_ref(
97100
&node_info->context->session,
98-
node_info->context->reliable_output,
99-
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_profile_name, UXR_REPLACE);
101+
*custom_node->context->entity_creation_output,
102+
node_info->participant_id,
103+
(uint16_t)domain_id,
104+
rmw_uxrce_profile_name,
105+
UXR_REPLACE);
100106
#endif
107+
101108
uint8_t status[1];
102109
uint16_t requests[] = {participant_req};
103110

104-
if (!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1)) {
111+
if (UXR_BEST_EFFORT_STREAM == node_info->context->entity_creation_output->type)
112+
{
113+
uxr_flash_output_streams(&node_info->context->session);
114+
}
115+
else if(!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1))
116+
{
105117
rmw_uxrce_fini_node_memory(node_handle);
106-
RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities");
118+
RMW_SET_ERROR_MSG("Issues creating Participant Micro XRCE-DDS entities");
107119
return NULL;
108120
}
109121

@@ -201,14 +213,16 @@ rmw_ret_t rmw_destroy_node(rmw_node_t * node)
201213

202214
uint16_t participant_req = uxr_buffer_delete_entity(
203215
&custom_node->context->session,
204-
custom_node->context->reliable_output,
216+
*custom_node->context->entity_creation_output,
205217
custom_node->participant_id);
206218
uint8_t status[1];
207219
uint16_t requests[] = {participant_req};
208220

209-
if (!uxr_run_session_until_all_status(
210-
&custom_node->context->session, 1000, requests, status,
211-
1))
221+
if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type)
222+
{
223+
uxr_flash_output_streams(&custom_node->context->session);
224+
}
225+
else if(!uxr_run_session_until_all_status(&custom_node->context->session, 1000, requests, status,1))
212226
{
213227
ret = RMW_RET_ERROR;
214228
}

rmw_microxrcedds_c/src/rmw_publish.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ rmw_publish(
5454
{
5555
written = functions->cdr_serialize(ros_message, &mb);
5656

57-
if (UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
57+
if (written && UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
5858
uxr_flash_output_streams(&custom_publisher->owner_node->context->session);
5959
} else {
6060
written &= uxr_run_session_until_confirm_delivery(

0 commit comments

Comments
 (0)