Skip to content

Commit

Permalink
Instrument client/service for end-to-end request/response tracking
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Dec 9, 2024
1 parent 9517aac commit 37ee160
Showing 1 changed file with 56 additions and 6 deletions.
62 changes: 56 additions & 6 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4676,6 +4676,13 @@ extern "C" rmw_ret_t rmw_take_response(
info->reqtime.erase(seq);
}
#endif
TRACETOOLS_TRACEPOINT(
rmw_take_response,
static_cast<const void *>(client),
static_cast<const void *>(ros_response),
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
(nullptr != request_header ? request_header->source_timestamp : 0LL),
*taken);
return ret;
}

Expand Down Expand Up @@ -4706,17 +4713,30 @@ extern "C" rmw_ret_t rmw_take_request(
service, service->implementation_identifier, eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
auto info = static_cast<CddsService *>(service->data);
return rmw_take_response_request(
rmw_ret_t ret = rmw_take_response_request(
&info->service, request_header, ros_request, taken, nullptr,
false);
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_take_request)) {
// Do not use the whole request_header->writer_guid, see the rmw_client_init tracepoint trigger
rmw_gid_t gid{};
memcpy(gid.data, &request_header->request_id.writer_guid, sizeof(info->service.pub->pubiid));
TRACETOOLS_DO_TRACEPOINT(
rmw_take_request,
static_cast<const void *>(service),
static_cast<const void *>(ros_request),
gid.data,
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
*taken);
}
return ret;
}

static rmw_ret_t rmw_send_response_request(
CddsCS * cs, const cdds_request_header_t & header,
const void * ros_data)
const void * ros_data, const dds_time_t timestamp)
{
const cdds_request_wrapper_t wrap = {header, const_cast<void *>(ros_data)};
if (dds_write(cs->pub->enth, static_cast<const void *>(&wrap)) >= 0) {
if (dds_write_ts(cs->pub->enth, static_cast<const void *>(&wrap), timestamp) >= 0) {
return RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
Expand Down Expand Up @@ -4818,7 +4838,22 @@ extern "C" rmw_ret_t rmw_send_response(
case client_present_t::MAYBE:
return RMW_RET_TIMEOUT;
case client_present_t::YES:
return rmw_send_response_request(&info->service, header, ros_response);
{
const dds_time_t timestamp = dds_time();
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_send_response)) {
// Do not use request_header->writer_guid, see the rmw_client_init tracepoint trigger
rmw_gid_t gid{};
memcpy(gid.data, &header.guid, sizeof(header.guid));
TRACETOOLS_DO_TRACEPOINT(
rmw_send_response,
static_cast<const void *>(service),
static_cast<const void *>(ros_response),
gid.data,
header.seq,
timestamp);
}
return rmw_send_response_request(&info->service, header, ros_response, timestamp);
}
case client_present_t::GONE:
return RMW_RET_OK;
}
Expand All @@ -4841,15 +4876,21 @@ extern "C" rmw_ret_t rmw_send_request(
cdds_request_header_t header;
header.guid = info->client.pub->pubiid;
header.seq = *sequence_id = ++next_request_id;
const dds_time_t timestamp = dds_time();

#if REPORT_BLOCKED_REQUESTS
{
std::lock_guard<std::mutex> lock(info->lock);
info->reqtime[header.seq] = dds_time();
info->reqtime[header.seq] = timestamp;
}
#endif

return rmw_send_response_request(&info->client, header, ros_request);
TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(client),
static_cast<const void *>(ros_request),
header.seq);
return rmw_send_response_request(&info->client, header, ros_request, timestamp);
}

static const rosidl_service_type_support_t * get_service_typesupport(
Expand Down Expand Up @@ -5151,6 +5192,15 @@ extern "C" rmw_client_t * rmw_create_client(
cleanup_client.cancel();
cleanup_fini_cs.cancel();
cleanup_info.cancel();
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_client_init)) {
// rmw_cyclonedds uses info->client.pub->pubiid as the internal request header GUID, which is
// the first half (8 bytes out of 16 bytes) of the rmw_request_id_t's writer_guid. The second
// half doesn't match when read from the client side and the service side, so only use the first
// half. The second half will be zeros on both client side and service side.
rmw_gid_t gid{};
memcpy(gid.data, &info->client.pub->pubiid, sizeof(info->client.pub->pubiid));
TRACETOOLS_DO_TRACEPOINT(rmw_client_init, static_cast<const void *>(rmw_client), gid.data);
}
return rmw_client;
}

Expand Down

0 comments on commit 37ee160

Please sign in to comment.