Skip to content

Commit

Permalink
Merge branch 'branch-0.41' into python-mp-test-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev authored Oct 16, 2024
2 parents 28a2215 + 79a3bfc commit 30bfd04
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 69 deletions.
35 changes: 30 additions & 5 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ concurrency:
jobs:
pr-builder:
needs:
- changed-files
- checks
- conda-cpp-build
- docs-build
Expand All @@ -25,6 +26,25 @@ jobs:
- wheel-tests-distributed-ucxx
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: always()
with:
needs: ${{ toJSON(needs) }}
changed-files:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
files_yaml: |
test_cpp:
- '**'
- '!.pre-commit-config.yaml'
- '!README.md'
- '!docs/**'
- '!python/**'
test_python:
- '**'
- '!.pre-commit-config.yaml'
- '!README.md'
- '!docs/**'
checks:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
Expand All @@ -47,23 +67,26 @@ jobs:
container_image: "rapidsai/ci-conda:latest"
run_script: "ci/build_docs.sh"
conda-cpp-tests:
needs: conda-cpp-build
needs: [conda-cpp-build, changed-files]
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: fromJSON(needs.changed-files.outputs.changed_file_groups).test_cpp
with:
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
conda-python-tests:
needs: conda-cpp-build
needs: [conda-cpp-build, changed-files]
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: fromJSON(needs.changed-files.outputs.changed_file_groups).test_python
with:
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
conda-python-distributed-tests:
needs: conda-cpp-build
needs: [conda-cpp-build, changed-files]
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: fromJSON(needs.changed-files.outputs.changed_file_groups).test_python
with:
build_type: pull-request
script: "ci/test_python_distributed.sh"
Expand All @@ -83,9 +106,10 @@ jobs:
build_type: pull-request
script: ci/build_wheel_ucxx.sh
wheel-tests-ucxx:
needs: wheel-build-ucxx
needs: [wheel-build-ucxx, changed-files]
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: fromJSON(needs.changed-files.outputs.changed_file_groups).test_python
with:
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
Expand All @@ -98,9 +122,10 @@ jobs:
build_type: pull-request
script: ci/build_wheel_distributed_ucxx.sh
wheel-tests-distributed-ucxx:
needs: [wheel-build-ucxx, wheel-build-distributed-ucxx]
needs: [wheel-build-ucxx, wheel-build-distributed-ucxx, changed-files]
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
if: fromJSON(needs.changed-files.outputs.changed_file_groups).test_python
with:
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
Expand Down
10 changes: 5 additions & 5 deletions ci/build_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ set -euo pipefail
rapids-logger "Create test conda environment"
. /opt/conda/etc/profile.d/conda.sh

export UCXX_VERSION="$(head -1 ./VERSION)"
UCXX_VERSION_MAJOR_MINOR="$(sed -E -e 's/^([0-9]+)\.([0-9]+)\.([0-9]+).*$/\1.\2/' VERSION)"

ENV_YAML_DIR="$(mktemp -d)"

rapids-dependency-file-generator \
Expand All @@ -23,11 +26,8 @@ CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp)

rapids-mamba-retry install \
--channel "${CPP_CHANNEL}" \
libucxx
"libucxx=${UCXX_VERSION}"

export UCXX_VERSION="$(sed -E -e 's/^([0-9]+)\.([0-9]+)\.([0-9]+).*$/\1.\2.\3/' VERSION)"
export UCXX_VERSION_MAJOR_MINOR="$(sed -E -e 's/^([0-9]+)\.([0-9]+)\.([0-9]+).*$/\1.\2/' VERSION)"
export RAPIDS_VERSION_NUMBER="$UCXX_VERSION_MAJOR_MINOR"
export RAPIDS_DOCS_DIR="$(mktemp -d)"

rapids-logger "Build CPP docs"
Expand All @@ -37,4 +37,4 @@ mkdir -p "${RAPIDS_DOCS_DIR}/libucxx/html"
mv html/* "${RAPIDS_DOCS_DIR}/libucxx/html"
popd

rapids-upload-docs
RAPIDS_VERSION_NUMBER="${UCXX_VERSION_MAJOR_MINOR}" rapids-upload-docs
6 changes: 5 additions & 1 deletion ci/test_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ source "$(dirname "$0")/test_common.sh"
rapids-logger "Create test conda environment"
. /opt/conda/etc/profile.d/conda.sh

UCXX_VERSION="$(head -1 ./VERSION)"

rapids-dependency-file-generator \
--output conda \
--file-key test_cpp \
Expand All @@ -29,7 +31,9 @@ CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp)

rapids-mamba-retry install \
--channel "${CPP_CHANNEL}" \
libucxx libucxx-examples libucxx-tests
"libucxx=${UCXX_VERSION}" \
"libucxx-examples=${UCXX_VERSION}" \
"libucxx-tests=${UCXX_VERSION}"

print_ucx_config

Expand Down
5 changes: 4 additions & 1 deletion ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ source "$(dirname "$0")/test_common.sh"
rapids-logger "Create test conda environment"
. /opt/conda/etc/profile.d/conda.sh

UCXX_VERSION="$(head -1 ./VERSION)"

rapids-dependency-file-generator \
--output conda \
--file-key test_python \
Expand All @@ -27,7 +29,8 @@ CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp)

rapids-mamba-retry install \
--channel "${CPP_CHANNEL}" \
libucxx ucxx
"libucxx=${UCXX_VERSION}" \
"ucxx=${UCXX_VERSION}"

print_ucx_config

Expand Down
6 changes: 5 additions & 1 deletion ci/test_python_distributed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ source "$(dirname "$0")/test_common.sh"
rapids-logger "Create test conda environment"
. /opt/conda/etc/profile.d/conda.sh

UCXX_VERSION="$(head -1 ./VERSION)"

rapids-dependency-file-generator \
--output conda \
--file-key test_python_distributed \
Expand All @@ -27,7 +29,9 @@ CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp)

rapids-mamba-retry install \
--channel "${CPP_CHANNEL}" \
libucxx ucxx distributed-ucxx
"libucxx=${UCXX_VERSION}" \
"ucxx=${UCXX_VERSION}" \
"distributed-ucxx=${UCXX_VERSION}"

print_ucx_config

Expand Down
84 changes: 36 additions & 48 deletions python/distributed-ucxx/distributed_ucxx/ucxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import struct
import weakref
from collections.abc import Awaitable, Callable, Collection
from threading import Lock
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

Expand Down Expand Up @@ -49,6 +50,13 @@
pre_existing_cuda_context = False
cuda_context_created = False
multi_buffer = None
# Lock protecting access to _resources dict
_resources_lock = Lock()
# Mapping from UCXX context handles to sets of registered dask resource IDs
# Used to track when there are no more users of the context, at which point
# its progress task and notification thread can be shut down.
# See _register_dask_resource and _deregister_dask_resource.
_resources = dict()


_warning_suffix = (
Expand Down Expand Up @@ -95,13 +103,13 @@ def make_register():
count = itertools.count()

def register() -> int:
"""Register a Dask resource with the UCXX context.
"""Register a Dask resource with the resource tracker.
Register a Dask resource with the UCXX context and keep track of it with the
use of a unique ID for the resource. The resource ID is later used to
deregister the resource from the UCXX context calling
`_deregister_dask_resource(resource_id)`, which stops the notifier thread
and progress tasks when no more UCXX resources are alive.
Generate a unique ID for the resource and register it with the resource
tracker. The resource ID is later used to deregister the resource from
the tracker calling `_deregister_dask_resource(resource_id)`, which
stops the notifier thread and progress tasks when no more UCXX resources
are alive.
Returns
-------
Expand All @@ -110,9 +118,13 @@ def register() -> int:
`_deregister_dask_resource` during stop/destruction of the resource.
"""
ctx = ucxx.core._get_ctx()
with ctx._dask_resources_lock:
handle = ctx.context.handle
with _resources_lock:
if handle not in _resources:
_resources[handle] = set()

resource_id = next(count)
ctx._dask_resources.add(resource_id)
_resources[handle].add(resource_id)
ctx.start_notifier_thread()
ctx.continuous_ucx_progress()
return resource_id
Expand All @@ -126,11 +138,11 @@ def register() -> int:


def _deregister_dask_resource(resource_id):
"""Deregister a Dask resource with the UCXX context.
"""Deregister a Dask resource from the resource tracker.
Deregister a Dask resource from the UCXX context with given ID, and if no
resources remain after deregistration, stop the notifier thread and progress
tasks.
Deregister a Dask resource from the resource tracker with given ID, and if
no resources remain after deregistration, stop the notifier thread and
progress tasks.
Parameters
----------
Expand All @@ -144,40 +156,22 @@ def _deregister_dask_resource(resource_id):
return

ctx = ucxx.core._get_ctx()
handle = ctx.context.handle

# Check if the attribute exists first, in tests the UCXX context may have
# been reset before some resources are deregistered.
if hasattr(ctx, "_dask_resources_lock"):
with ctx._dask_resources_lock:
try:
ctx._dask_resources.remove(resource_id)
except KeyError:
pass

# Stop notifier thread and progress tasks if no Dask resources using
# UCXX communicators are running anymore.
if len(ctx._dask_resources) == 0:
ctx.stop_notifier_thread()
ctx.progress_tasks.clear()


def _allocate_dask_resources_tracker() -> None:
"""Allocate Dask resources tracker.
Allocate a Dask resources tracker in the UCXX context. This is useful to
track Distributed communicators so that progress and notifier threads can
be cleanly stopped when no UCXX communicators are alive anymore.
"""
ctx = ucxx.core._get_ctx()
if not hasattr(ctx, "_dask_resources"):
# TODO: Move the `Lock` to a file/module-level variable for true
# lock-safety. The approach implemented below could cause race
# conditions if this function is called simultaneously by multiple
# threads.
from threading import Lock
with _resources_lock:
try:
_resources[handle].remove(resource_id)
except KeyError:
pass

ctx._dask_resources = set()
ctx._dask_resources_lock = Lock()
# Stop notifier thread and progress tasks if no Dask resources using
# UCXX communicators are running anymore.
if handle in _resources and len(_resources[handle]) == 0:
ctx.stop_notifier_thread()
ctx.progress_tasks.clear()
del _resources[handle]


def init_once():
Expand All @@ -187,11 +181,6 @@ def init_once():
global multi_buffer

if ucxx is not None:
# Ensure reallocation of Dask resources tracker if the UCXX context was
# reset since the previous `init_once()` call. This may happen in tests,
# where the `ucxx_loop` fixture will reset the context after each test.
_allocate_dask_resources_tracker()

return

# remove/process dask.ucx flags for valid ucx options
Expand Down Expand Up @@ -254,7 +243,6 @@ def init_once():
# environment, so the user's external environment can safely
# override things here.
ucxx.init(options=ucx_config, env_takes_precedence=True)
_allocate_dask_resources_tracker()

pool_size_str = dask.config.get("distributed.rmm.pool-size")

Expand Down
7 changes: 6 additions & 1 deletion python/ucxx/ucxx/_lib/arr.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

# cython: language_level=3


from libc.stdint cimport uintptr_t

Expand All @@ -22,3 +24,6 @@ cdef class Array:
cpdef bint _f_contiguous(self)
cpdef bint _contiguous(self)
cpdef Py_ssize_t _nbytes(self)


cpdef Array asarray(obj)
16 changes: 12 additions & 4 deletions python/ucxx/ucxx/_lib/arr.pyi
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

from typing import Tuple
from typing import Generic, Tuple, TypeVar

class Array:
def __init__(self, obj: object): ...
T = TypeVar("T")

class Array(Generic[T]):
def __init__(self, obj: T): ...
@property
def c_contiguous(self) -> bool: ...
@property
Expand All @@ -17,3 +19,9 @@ class Array:
def shape(self) -> Tuple[int]: ...
@property
def strides(self) -> Tuple[int]: ...
@property
def cuda(self) -> bool: ...
@property
def obj(self) -> T: ...

def asarray(obj) -> Array: ...
23 changes: 22 additions & 1 deletion python/ucxx/ucxx/_lib/arr.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

# cython: language_level=3


from cpython.array cimport array, newarrayobject
from cpython.buffer cimport PyBuffer_IsContiguous
Expand Down Expand Up @@ -295,3 +297,22 @@ cdef inline Py_ssize_t _nbytes(Py_ssize_t itemsize,
for i in range(ndim):
nbytes *= shape_mv[i]
return nbytes


cpdef Array asarray(obj):
"""Coerce other objects to ``Array``. No-op for existing ``Array``s.
Parameters
----------
obj: object
Object exposing the Python buffer protocol or ``__cuda_array_interface__``.
Returns
-------
array: Array
An instance of the ``Array`` class.
"""
if isinstance(obj, Array):
return <Array>obj
else:
return Array(obj)
Loading

0 comments on commit 30bfd04

Please sign in to comment.