Skip to content

Commit

Permalink
Test tag user callback with discarded return
Browse files Browse the repository at this point in the history
We were recently inquired by a user whether relying only on the
`ucs_status_t` passed to the user callback and discarding the
`ucxx::RequestTag` return from `ucxx::Endpoint::tagSend` and
`ucxx::Endpoint::tagRecv` is valid.  This turns out to be a valid usage,
althought I'm unsure whether encouraging this is a good idea, since
requests such as `ucxx::Endpoint::amRecv` require that the user retrieve
the resulting buffer from the returned `ucxx::RequestAm`, and if this is
discarded the buffer is lost. This PR adds a test for the aforementioned
use case but makes no changes to documentation to prevent encouraging
this pattern until we can decide whether we should support it or not.

For requests such as `ucxx::Endpoint::amRecv`, it might be worth
studying whether we could pass the resulting buffer and any other
attributes associated with it to the callback, in that case we may be
able to provide a safe pattern to always use the callback if the user
doesn't want to keep a referencce to the returned `ucxx::Request`
object.
  • Loading branch information
pentschev committed Aug 14, 2024
1 parent 02923ae commit 4a962e2
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions cpp/tests/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,49 @@ TEST_P(RequestTest, TagUserCallback)

for (const auto request : requests)
ASSERT_THAT(request->getStatus(), UCS_OK);
for (const auto status : requestStatus)
ASSERT_THAT(status, UCS_OK);

// Assert data correctness
ASSERT_THAT(_recv[0], ContainerEq(_send[0]));
}

TEST_P(RequestTest, TagUserCallbackDiscardReturn)
{
allocate();

std::vector<ucs_status_t> requestStatus(2, UCS_INPROGRESS);

auto checkStatus = [&requestStatus](ucs_status_t status, ::ucxx::RequestCallbackUserData data) {
auto idx = *std::static_pointer_cast<size_t>(data);
requestStatus[idx] = status;
};

auto checkCompletion = [&requestStatus, this]() {
std::vector<size_t> completed(2, 0);
while (std::accumulate(completed.begin(), completed.end(), 0) != 2) {
_progressWorker();
std::transform(
requestStatus.begin(), requestStatus.end(), completed.begin(), [](ucs_status_t status) {
return status == UCS_INPROGRESS ? 0 : 1;
});
}
};

auto sendIndex = std::make_shared<size_t>(0u);
auto recvIndex = std::make_shared<size_t>(1u);

// Submit and wait for transfers to complete
std::ignore =
_ep->tagSend(_sendPtr[0], _messageSize, ucxx::Tag{0}, false, checkStatus, sendIndex);
std::ignore = _ep->tagRecv(
_recvPtr[0], _messageSize, ucxx::Tag{0}, ucxx::TagMaskFull, false, checkStatus, recvIndex);
checkCompletion();

copyResults();

for (const auto status : requestStatus)
ASSERT_THAT(status, UCS_OK);

// Assert data correctness
ASSERT_THAT(_recv[0], ContainerEq(_send[0]));
Expand Down

0 comments on commit 4a962e2

Please sign in to comment.