Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ jobs:

- name: Download dependencies
run: |
git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b ros2 https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b feature/run_session_until_one_stream https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
git clone -b feature/run_session_until_one_stream https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client

git clone -b galactic https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds
touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE
# Install coverage tools
Expand Down
52 changes: 52 additions & 0 deletions rmw_microxrcedds_c/include/rmw_microros/init_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,58 @@ rmw_ret_t rmw_uros_options_set_client_key(
uint32_t client_key,
rmw_init_options_t * rmw_options);

/**
* \brief Fills rmw implementation-specific options with the given parameters.
* In this case, the stream number of a publisher can be defined.
* This function may use static memory resources that should be freed.
*
* \param[in] stream Micro XRCE-DDS Client stream number.
* \param[in,out] rmw_options Updated options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_set_publisher_out_stream(
size_t stream,
rmw_publisher_options_t * rmw_options);

/**
* \brief Free rmw implementation-specific static resources allocated.
*
* \param[in,out] rmw_options RMW options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_free_publisher_init_options(
rmw_publisher_options_t * rmw_options);

/**
* \brief Fills rmw implementation-specific options with the given parameters.
* In this case, the stream number of a subscriber can be defined.
* This function may use static memory resources that should be freed.
*
* \param[in] stream Micro XRCE-DDS Client stream number.
* \param[in,out] rmw_options Updated options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_set_subscriber_input_stream(
size_t stream,
rmw_subscription_options_t * rmw_options);

/**
* \brief Free rmw implementation-specific static resources allocated.
*
* \param[in,out] rmw_options Options with rmw specifics.
* \return RMW_RET_OK If arguments were valid and set in rmw_init_options.
* \return RMW_RET_INVALID_ARGUMENT If rmw_init_options is not valid or unexpected arguments.
* \return RMW_RET_ERROR In other case.
*/
rmw_ret_t rmw_uros_free_subscriber_init_options(
rmw_subscription_options_t * rmw_options);

/** @}*/

#if defined(__cplusplus)
Expand Down
1 change: 1 addition & 0 deletions rmw_microxrcedds_c/src/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#define RMW_UXRCE_MAX_TOPICS @RMW_UXRCE_MAX_TOPICS@
#define RMW_UXRCE_MAX_WAIT_SETS @RMW_UXRCE_MAX_WAIT_SETS@
#define RMW_UXRCE_MAX_GUARD_CONDITION @RMW_UXRCE_MAX_GUARD_CONDITION@
#define RMW_UXRCE_MAX_ENTITIES_INIT_OPTION RMW_UXRCE_MAX_PUBLISHERS + RMW_UXRCE_MAX_SUBSCRIPTIONS

#if RMW_UXRCE_MAX_TOPICS == -1
#define RMW_UXRCE_MAX_TOPICS_INTERNAL RMW_UXRCE_MAX_PUBLISHERS + RMW_UXRCE_MAX_SUBSCRIPTIONS
Expand Down
8 changes: 4 additions & 4 deletions rmw_microxrcedds_c/src/rmw_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ rmw_create_client(

custom_client->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[0] :
custom_node->context->reliable_output[0];

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[0] :
custom_node->context->reliable_input[0];

custom_client->client_data_request = uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
51 changes: 33 additions & 18 deletions rmw_microxrcedds_c/src/rmw_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,12 @@ rmw_init(
context_impl->destroy_timeout = RMW_UXRCE_ENTITY_DESTROY_TIMEOUT;

context_impl->creation_stream = (RMW_UXRCE_ENTITY_CREATION_TIMEOUT > 0) ?
&context_impl->reliable_output :
&context_impl->best_effort_output;
&context_impl->reliable_output[0] :
&context_impl->best_effort_output[0];

context_impl->destroy_stream = (RMW_UXRCE_ENTITY_DESTROY_TIMEOUT > 0) ?
&context_impl->reliable_output :
&context_impl->best_effort_output;
&context_impl->reliable_output[0] :
&context_impl->best_effort_output[0];

context_impl->id_participant = 0;
context_impl->id_topic = 0;
Expand Down Expand Up @@ -269,6 +269,10 @@ rmw_init(
rmw_uxrce_init_guard_condition_memory(
&guard_condition_memory, custom_guard_condition,
RMW_UXRCE_MAX_GUARD_CONDITION);
rmw_uxrce_init_entities_init_options_memory(
&entities_init_options_memory,
custom_entities_init_options,
RMW_UXRCE_MAX_ENTITIES_INIT_OPTION);

// Micro-XRCE-DDS Client transport initialization
rmw_ret_t transport_init_ret = rmw_uxrce_transport_init(
Expand All @@ -286,20 +290,31 @@ rmw_init(
uxr_set_request_callback(&context_impl->session, on_request, NULL);
uxr_set_reply_callback(&context_impl->session, on_reply, NULL);

context_impl->reliable_input = uxr_create_input_reliable_stream(
&context_impl->session, context_impl->input_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY_INPUT,
RMW_UXRCE_STREAM_HISTORY_INPUT);
context_impl->reliable_output =
uxr_create_output_reliable_stream(
&context_impl->session, context_impl->output_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY_OUTPUT,
RMW_UXRCE_STREAM_HISTORY_OUTPUT);

context_impl->best_effort_input = uxr_create_input_best_effort_stream(&context_impl->session);
context_impl->best_effort_output = uxr_create_output_best_effort_stream(
&context_impl->session,
context_impl->output_best_effort_stream_buffer, context_impl->transport.comm.mtu);
for (size_t i = 0; i < UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS; i++) {
context_impl->reliable_input[i] = uxr_create_input_reliable_stream(
&context_impl->session, context_impl->input_reliable_stream_buffer[i],
RMW_UXRCE_MAX_INPUT_BUFFER_SIZE,
RMW_UXRCE_STREAM_HISTORY_INPUT);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS; i++) {
context_impl->reliable_output[i] =
uxr_create_output_reliable_stream(
&context_impl->session, context_impl->output_reliable_stream_buffer[i],
RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE,
RMW_UXRCE_STREAM_HISTORY_OUTPUT);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS; i++) {
context_impl->best_effort_input[i] =
uxr_create_input_best_effort_stream(&context_impl->session);
}

for (size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS; i++) {
context_impl->best_effort_output[i] = uxr_create_output_best_effort_stream(
&context_impl->session,
context_impl->output_best_effort_stream_buffer[i], context_impl->transport.comm.mtu);
}

if (!uxr_create_session(&context_impl->session)) {
CLOSE_TRANSPORT(&context_impl->transport);
Expand Down
84 changes: 84 additions & 0 deletions rmw_microxrcedds_c/src/rmw_microros/init_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,87 @@ rmw_ret_t rmw_uros_options_set_client_key(

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_set_publisher_out_stream(
size_t stream,
rmw_publisher_options_t * rmw_options)
{
if (NULL == rmw_options) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

if (NULL == rmw_options->rmw_specific_publisher_payload) {
rmw_uxrce_mempool_item_t * memory_node = get_memory(&entities_init_options_memory);
if (NULL == memory_node) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher options");
return RMW_RET_ERROR;
}
rmw_options->rmw_specific_publisher_payload = memory_node->data;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_publisher_payload;
uxrce_init_options->stream_index.publisher_output_stream = stream;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_free_publisher_init_options(
rmw_publisher_options_t * rmw_options)
{
if (NULL == rmw_options || NULL == rmw_options->rmw_specific_publisher_payload) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_publisher_payload;

put_memory(&entities_init_options_memory, &uxrce_init_options->mem);
rmw_options->rmw_specific_publisher_payload = NULL;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_set_subscriber_input_stream(
size_t stream,
rmw_subscription_options_t * rmw_options)
{
if (NULL == rmw_options) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

if (NULL == rmw_options->rmw_specific_subscription_payload) {
rmw_uxrce_mempool_item_t * memory_node = get_memory(&entities_init_options_memory);
if (NULL == memory_node) {
RMW_SET_ERROR_MSG("failed to allocate memory for subscriber options");
return RMW_RET_ERROR;
}
rmw_options->rmw_specific_subscription_payload = memory_node->data;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_subscription_payload;
uxrce_init_options->stream_index.subscriber_input_stream = stream;

return RMW_RET_OK;
}

rmw_ret_t rmw_uros_free_subscriber_init_options(
rmw_subscription_options_t * rmw_options)
{
if (NULL == rmw_options || NULL == rmw_options->rmw_specific_subscription_payload) {
RMW_SET_ERROR_MSG("Uninitialised rmw_init_options.");
return RMW_RET_INVALID_ARGUMENT;
}

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) rmw_options->rmw_specific_subscription_payload;

put_memory(&entities_init_options_memory, &uxrce_init_options->mem);
rmw_options->rmw_specific_subscription_payload = NULL;

return RMW_RET_OK;
}
32 changes: 25 additions & 7 deletions rmw_microxrcedds_c/src/rmw_microros_internal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,22 @@ typedef struct rmw_context_impl_t
#endif // if RMW_UXRCE_GRAPH
rmw_guard_condition_t graph_guard_condition;

uxrStreamId reliable_input;
uxrStreamId reliable_output;
uxrStreamId best_effort_output;
uxrStreamId best_effort_input;
uxrStreamId reliable_input[UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS];
uxrStreamId reliable_output[UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS];
uxrStreamId best_effort_output[UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS];
uxrStreamId best_effort_input[UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS];

uxrStreamId * creation_stream;
uxrStreamId * destroy_stream;
int creation_timeout;
int destroy_timeout;

uint8_t input_reliable_stream_buffer[RMW_UXRCE_MAX_INPUT_BUFFER_SIZE];
uint8_t output_reliable_stream_buffer[RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE];
uint8_t output_best_effort_stream_buffer[RMW_UXRCE_MAX_TRANSPORT_MTU];
uint8_t input_reliable_stream_buffer[UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS][
RMW_UXRCE_MAX_INPUT_BUFFER_SIZE];
uint8_t output_reliable_stream_buffer[UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS][
RMW_UXRCE_MAX_OUTPUT_BUFFER_SIZE];
uint8_t output_best_effort_stream_buffer[UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS][
RMW_UXRCE_MAX_TRANSPORT_MTU];

uint16_t id_participant;
uint16_t id_topic;
Expand All @@ -113,6 +116,16 @@ struct rmw_init_options_impl_t

typedef struct rmw_init_options_impl_t rmw_uxrce_init_options_impl_t;

typedef struct rmw_uxrce_entities_init_options_t
{
rmw_uxrce_mempool_item_t mem;

union {
size_t publisher_output_stream;
size_t subscriber_input_stream;
} stream_index;
} rmw_uxrce_entities_init_options_t;

// ROS2 entities definitions

typedef struct rmw_uxrce_topic_t
Expand Down Expand Up @@ -294,6 +307,10 @@ extern rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS];
extern rmw_uxrce_mempool_t guard_condition_memory;
extern rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION];

extern rmw_uxrce_mempool_t entities_init_options_memory;
extern rmw_uxrce_entities_init_options_t custom_entities_init_options[
RMW_UXRCE_MAX_ENTITIES_INIT_OPTION];

// Memory init functions

#define RMW_INIT_DEFINE_MEMORY(X) \
Expand All @@ -313,6 +330,7 @@ RMW_INIT_DEFINE_MEMORY(static_input_buffer)
RMW_INIT_DEFINE_MEMORY(init_options_impl)
RMW_INIT_DEFINE_MEMORY(wait_set)
RMW_INIT_DEFINE_MEMORY(guard_condition)
RMW_INIT_DEFINE_MEMORY(entities_init_options)

// Memory management functions

Expand Down
10 changes: 7 additions & 3 deletions rmw_microxrcedds_c/src/rmw_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ rmw_publish(
custom_publisher->stream_id);

if (UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
uxr_flash_output_streams(&custom_publisher->owner_node->context->session);
uxr_flash_one_output_stream(
&custom_publisher->owner_node->context->session,
custom_publisher->stream_id);
} else {
written &= uxr_run_session_until_confirm_delivery(
&custom_publisher->owner_node->context->session, custom_publisher->session_timeout);
written &= uxr_run_session_until_confirm_delivery_one_stream(
&custom_publisher->owner_node->context->session,
custom_publisher->stream_id,
custom_publisher->session_timeout);
}
}
if (!written) {
Expand Down
10 changes: 8 additions & 2 deletions rmw_microxrcedds_c/src/rmw_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,16 @@ rmw_create_publisher(
custom_publisher->session_timeout = RMW_UXRCE_PUBLISH_RELIABLE_TIMEOUT;
custom_publisher->qos = *qos_policies;

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) publisher_options->rmw_specific_publisher_payload;
size_t used_output_stream =
(NULL != uxrce_init_options) ?
uxrce_init_options->stream_index.publisher_output_stream : 0;

custom_publisher->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[used_output_stream] :
custom_node->context->reliable_output[used_output_stream];

custom_publisher->cs_cb_size = NULL;
custom_publisher->cs_cb_serialization = NULL;
Expand Down
8 changes: 4 additions & 4 deletions rmw_microxrcedds_c/src/rmw_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ rmw_create_service(

custom_service->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;
custom_node->context->best_effort_output[0] :
custom_node->context->reliable_output[0];

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[0] :
custom_node->context->reliable_input[0];

custom_service->service_data_resquest = uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
10 changes: 8 additions & 2 deletions rmw_microxrcedds_c/src/rmw_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,16 @@ rmw_create_subscription(
delivery_control.max_elapsed_time = UXR_MAX_ELAPSED_TIME_UNLIMITED;
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

rmw_uxrce_entities_init_options_t * uxrce_init_options =
(rmw_uxrce_entities_init_options_t *) subscription_options->rmw_specific_subscription_payload;
size_t used_input_stream =
(NULL != uxrce_init_options) ?
uxrce_init_options->stream_index.subscriber_input_stream : 0;

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_input[used_input_stream] :
custom_node->context->reliable_input[used_input_stream];

uxr_buffer_request_data(
&custom_node->context->session,
Expand Down
Loading