Skip to content

Commit

Permalink
Merge branch 'branch-0.41' into bug/fix-rmm-imports
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 authored Oct 4, 2024
2 parents a5aaba7 + d8d5ca7 commit 3e2338b
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 30 deletions.
4 changes: 2 additions & 2 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR}
NEXT_RAPIDS_SHORT_TAG="$(curl -sL https://version.gpuci.io/ucx-py/${NEXT_SHORT_TAG})"

# Need to distutils-normalize the versions for some use cases
NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; print(packaging.version.Version('${NEXT_SHORT_TAG}'))")
NEXT_RAPIDS_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; print(packaging.version.Version('${NEXT_RAPIDS_SHORT_TAG}'))")
NEXT_SHORT_TAG_PEP440=$(python -c "from packaging.version import Version; print(Version('${NEXT_SHORT_TAG}'))")
NEXT_RAPIDS_SHORT_TAG_PEP440=$(python -c "from packaging.version import Version; print(Version('${NEXT_RAPIDS_SHORT_TAG}'))")
echo "Next tag is ${NEXT_RAPIDS_SHORT_TAG_PEP440}"

echo "Preparing release: $NEXT_FULL_TAG"
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies:
- dask-cuda==24.12.*,>=0.0.0a0
- dask-cudf==24.12.*,>=0.0.0a0
- doxygen=1.9.1
- fmt>=10.1.1,<11
- fmt>=11.0.2,<12
- librmm==24.12.*,>=0.0.0a0
- libtool
- ninja
Expand All @@ -40,6 +40,6 @@ dependencies:
- rmm==24.12.*,>=0.0.0a0
- scikit-build-core>=0.10.0
- setuptools>=64.0.0
- spdlog>=1.12.0,<1.13
- spdlog>=1.14.1,<1.15
- ucx>=1.15.0,<1.18
name: all_cuda-118_arch-x86_64
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies:
- dask-cuda==24.12.*,>=0.0.0a0
- dask-cudf==24.12.*,>=0.0.0a0
- doxygen=1.9.1
- fmt>=10.1.1,<11
- fmt>=11.0.2,<12
- librmm==24.12.*,>=0.0.0a0
- libtool
- ninja
Expand All @@ -40,6 +40,6 @@ dependencies:
- rmm==24.12.*,>=0.0.0a0
- scikit-build-core>=0.10.0
- setuptools>=64.0.0
- spdlog>=1.12.0,<1.13
- spdlog>=1.14.1,<1.15
- ucx>=1.15.0,<1.18
name: all_cuda-125_arch-x86_64
82 changes: 62 additions & 20 deletions cpp/python/src/future.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ PyObject* create_future_str = NULL;
PyObject* future_str = NULL;
PyObject* set_exception_str = NULL;
PyObject* set_result_str = NULL;
PyObject* done_str = NULL;
PyObject* cancelled_str = NULL;

static int intern_strings(void)
{
Expand All @@ -32,6 +34,10 @@ static int intern_strings(void)
if (set_exception_str == NULL) { return -1; }
set_result_str = PyUnicode_InternFromString("set_result");
if (set_result_str == NULL) { return -1; }
done_str = PyUnicode_InternFromString("done");
if (done_str == NULL) { return -1; }
cancelled_str = PyUnicode_InternFromString("cancelled");
if (cancelled_str == NULL) { return -1; }
return 0;
}

Expand Down Expand Up @@ -60,18 +66,12 @@ static PyObject* get_asyncio_future_object()
}

asyncio_module = PyImport_Import(asyncio_str);
if (PyErr_Occurred()) {
ucxx_trace_req("ucxx::python::%s, error importing asyncio", __func__);
PyErr_Print();
}
if (PyErr_Occurred()) PyErr_Print();
if (PyErr_Occurred()) ucxx_error("ucxx::python::%s, error importing asyncio", __func__);
if (asyncio_module == NULL) goto finish;

asyncio_future_object = PyObject_GetAttr(asyncio_module, future_str);
if (PyErr_Occurred()) {
ucxx_trace_req("ucxx::python::%s, error getting asyncio.Future method", __func__);
PyErr_Print();
}
if (PyErr_Occurred())
ucxx_error("ucxx::python::%s, error getting asyncio.Future method", __func__);
Py_DECREF(asyncio_module);
if (asyncio_future_object == NULL) { goto finish; }

Expand Down Expand Up @@ -103,13 +103,39 @@ PyObject* create_python_future()
}

result = PyObject_CallFunctionObjArgs(future_object, NULL);
if (PyErr_Occurred()) ucxx_error("ucxx::python::%s, error creating asyncio.Future", __func__);

finish:
PyGILState_Release(state);
return result;
}

PyObject* check_future_state(PyObject* future)
{
PyObject* result = NULL;

PyGILState_STATE state = PyGILState_Ensure();

result = PyObject_CallMethodNoArgs(future, cancelled_str);
if (PyErr_Occurred()) {
ucxx_trace_req("ucxx::python::%s, error creating asyncio.Future", __func__);
PyErr_Print();
ucxx_error("ucxx::python::%s, error calling `cancelled()` from `asyncio.Future` object",
__func__);
} else if (PyObject_IsTrue(result)) {
ucxx_trace_req("ucxx::python::%s, `asyncio.Future` object has been cancelled.", __func__);
goto finish;
}

result = PyObject_CallMethodNoArgs(future, done_str);
if (PyErr_Occurred()) {
ucxx_error("ucxx::python::%s, error calling `done()` from `asyncio.Future` object", __func__);
} else if (PyObject_IsTrue(result)) {
ucxx_trace_req("ucxx::python::%s, `asyncio.Future` object is already done.", __func__);
goto finish;
}

finish:
PyGILState_Release(state);

return result;
}

Expand All @@ -119,13 +145,22 @@ PyObject* future_set_result(PyObject* future, PyObject* value)

PyGILState_STATE state = PyGILState_Ensure();

if (PyObject_IsTrue(check_future_state(future))) {
ucxx_trace_req(
"ucxx::python::%s, `asyncio.Future` object is already done or has been cancelled, "
"skipping `set_result()`.",
__func__);
goto finish;
}

result = PyObject_CallMethodOneArg(future, set_result_str, value);
if (PyErr_Occurred()) {
ucxx_trace_req("ucxx::python::%s, error calling `set_result()` from `asyncio.Future` object",
__func__);
ucxx_error("ucxx::python::%s, error calling `set_result()` from `asyncio.Future` object",
__func__);
PyErr_Print();
}

finish:
PyGILState_Release(state);

return result;
Expand All @@ -140,6 +175,14 @@ PyObject* future_set_exception(PyObject* future, PyObject* exception, const char

PyGILState_STATE state = PyGILState_Ensure();

if (PyObject_IsTrue(check_future_state(future))) {
ucxx_trace_req(
"ucxx::python::%s, `asyncio.Future` object is already done or has been cancelled, "
"skipping `set_exception()`.",
__func__);
goto finish;
}

message_object = PyUnicode_FromString(message);
if (message_object == NULL) goto err;
message_tuple = PyTuple_Pack(1, message_object);
Expand Down Expand Up @@ -174,8 +217,7 @@ PyObject* create_python_future_with_event_loop(PyObject* event_loop)

result = PyObject_CallMethodObjArgs(event_loop, create_future_str, NULL);
if (PyErr_Occurred()) {
ucxx_trace_req("ucxx::python::%s, error calling `create_future` from event loop object",
__func__);
ucxx_error("ucxx::python::%s, error calling `create_future` from event loop object", __func__);
PyErr_Print();
}

Expand All @@ -198,8 +240,8 @@ PyObject* future_set_result_with_event_loop(PyObject* event_loop, PyObject* futu

set_result_callable = PyObject_GetAttr(future, set_result_str);
if (PyErr_Occurred()) {
ucxx_trace_req(
"ucxx::python::%s, error getting `set_result` method from `asyncio.Future` object", __func__);
ucxx_error("ucxx::python::%s, error getting `set_result` method from `asyncio.Future` object",
__func__);
PyErr_Print();
goto finish;
}
Expand All @@ -214,7 +256,7 @@ PyObject* future_set_result_with_event_loop(PyObject* event_loop, PyObject* futu
result = PyObject_CallMethodObjArgs(
event_loop, call_soon_threadsafe_str, set_result_callable, value, NULL);
if (PyErr_Occurred()) {
ucxx_trace_req(
ucxx_error(
"ucxx::python::%s, error calling `call_soon_threadsafe` from event loop object to set future "
"result",
__func__);
Expand Down Expand Up @@ -247,7 +289,7 @@ PyObject* future_set_exception_with_event_loop(PyObject* event_loop,

set_exception_callable = PyObject_GetAttr(future, set_exception_str);
if (PyErr_Occurred()) {
ucxx_trace_req(
ucxx_error(
"ucxx::python::%s, Error getting `set_exception` method from `asyncio.Future` object",
__func__);
PyErr_Print();
Expand All @@ -271,7 +313,7 @@ PyObject* future_set_exception_with_event_loop(PyObject* event_loop,
result = PyObject_CallMethodObjArgs(
event_loop, call_soon_threadsafe_str, set_exception_callable, formed_exception, NULL);
if (PyErr_Occurred()) {
ucxx_trace_req(
ucxx_error(
"ucxx::python::%s, Error calling `call_soon_threadsafe` from event loop object to set future "
"exception",
__func__);
Expand Down
43 changes: 43 additions & 0 deletions cpp/tests/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,49 @@ TEST_P(RequestTest, TagUserCallback)

for (const auto request : requests)
ASSERT_THAT(request->getStatus(), UCS_OK);
for (const auto status : requestStatus)
ASSERT_THAT(status, UCS_OK);

// Assert data correctness
ASSERT_THAT(_recv[0], ContainerEq(_send[0]));
}

TEST_P(RequestTest, TagUserCallbackDiscardReturn)
{
allocate();

std::vector<ucs_status_t> requestStatus(2, UCS_INPROGRESS);

auto checkStatus = [&requestStatus](ucs_status_t status, ::ucxx::RequestCallbackUserData data) {
auto idx = *std::static_pointer_cast<size_t>(data);
requestStatus[idx] = status;
};

auto checkCompletion = [&requestStatus, this]() {
std::vector<size_t> completed(2, 0);
while (std::accumulate(completed.begin(), completed.end(), 0) != 2) {
_progressWorker();
std::transform(
requestStatus.begin(), requestStatus.end(), completed.begin(), [](ucs_status_t status) {
return status == UCS_INPROGRESS ? 0 : 1;
});
}
};

auto sendIndex = std::make_shared<size_t>(0u);
auto recvIndex = std::make_shared<size_t>(1u);

// Submit and wait for transfers to complete
std::ignore =
_ep->tagSend(_sendPtr[0], _messageSize, ucxx::Tag{0}, false, checkStatus, sendIndex);
std::ignore = _ep->tagRecv(
_recvPtr[0], _messageSize, ucxx::Tag{0}, ucxx::TagMaskFull, false, checkStatus, recvIndex);
checkCompletion();

copyResults();

for (const auto status : requestStatus)
ASSERT_THAT(status, UCS_OK);

// Assert data correctness
ASSERT_THAT(_recv[0], ContainerEq(_send[0]));
Expand Down
4 changes: 2 additions & 2 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ dependencies:
- c-compiler
- cxx-compiler
- &cmake_ver cmake>=3.26.4,!=3.30.0
- fmt>=10.1.1,<11
- fmt>=11.0.2,<12
- librmm==24.12.*,>=0.0.0a0
- ninja
- spdlog>=1.12.0,<1.13
- spdlog>=1.14.1,<1.15
- output_types: [requirements, pyproject]
packages:
- *cmake_ver
Expand Down
4 changes: 2 additions & 2 deletions python/libucxx/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ readme = { file = "README.md", content-type = "text/markdown" }
authors = [
{ name = "NVIDIA Corporation" },
]
license = { text = "Apache 2.0" }
license = { text = "BSD-3-Clause" }
classifiers = [
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
Expand Down Expand Up @@ -64,4 +64,4 @@ requires = [
"librmm==24.12.*,>=0.0.0a0",
"libucx==1.15.0",
"ninja",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.

0 comments on commit 3e2338b

Please sign in to comment.