diff --git a/build.sh b/build.sh index ea63b566c..bba4e8be2 100755 --- a/build.sh +++ b/build.sh @@ -143,6 +143,13 @@ build_p2p() { else echo "[container] USE_TCPX=1, skipping copying p2p runtime files" fi + if [[ "$TARGET" == rocm* ]]; then + cd thirdparty/dietgpu + rm -rf build/ + python3 setup.py build + cd ../.. + cp thirdparty/dietgpu/build/**/*.so uccl/ + fi } build_ep() { @@ -180,18 +187,17 @@ build_eccl() { set -euo pipefail echo "[container] build_eccl Target: $TARGET" - cd eccl + cd experimental/eccl if [[ "$TARGET" == cuda* ]]; then - echo "Skipping eccl build on Cuda." - return + make clean -f Makefile && make -j$(nproc) -f Makefile elif [[ "$TARGET" == rocm* ]]; then make clean -f Makefile.rocm && make -j$(nproc) -f Makefile.rocm fi - cd .. + cd ../.. echo "[container] Copying eccl .so to uccl/" - # mkdir -p uccl/lib - # cp eccl/eccl.*.so uccl/ + mkdir -p uccl/lib # mkdir anyway + cp experimental/eccl/*eccl*.so uccl/lib } # Determine the Docker image to use based on the target and architecture @@ -250,6 +256,44 @@ else fi echo "[2/3] Running build inside container..." + +# Auto-detect CUDA architecture for ep build +DETECTED_GPU_ARCH="" +if [[ "$BUILD_TYPE" =~ (ep|all|p2p) ]];then + if [[ "$TARGET" == cuda* ]] && command -v nvidia-smi &> /dev/null; then + DETECTED_GPU_ARCH="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 | tr -d ' ' || true)" + + if [[ -n "$DETECTED_GPU_ARCH" ]]; then + echo "Auto-detected CUDA compute capability: ${DETECTED_GPU_ARCH}" + fi + elif [[ "$TARGET" == rocm* ]] && command -v amd-smi &> /dev/null; then + # Check if jq is installed, install via pip if not + if ! command -v jq &> /dev/null; then + echo "jq not found, installing via pip..." + pip install jq + fi + DETECTED_GPU_ARCH="$( + PYTHONWARNINGS=ignore \ + amd-smi static -g 0 --asic --json 2>/dev/null \ + | jq -r ' + if .gpu_data and (.gpu_data | length > 0) then + .gpu_data[0].asic.target_graphics_version + else + empty + end + ' \ + || true + )" + if [[ -n "$DETECTED_GPU_ARCH" ]]; then + echo "Auto-detected ROCm architecture: ${DETECTED_GPU_ARCH}" + fi + else + echo "[INFO] No compatible GPU detection tool found, skipping auto-detect" + fi +fi + +export TORCH_CUDA_ARCH_LIST="${TORCH_CUDA_ARCH_LIST:-${DETECTED_GPU_ARCH}}" + docker run --rm --user "$(id -u):$(id -g)" \ -v /etc/passwd:/etc/passwd:ro \ -v /etc/group:/etc/group:ro \ @@ -263,6 +307,8 @@ docker run --rm --user "$(id -u):$(id -g)" \ -e WHEEL_DIR="${WHEEL_DIR}" \ -e BUILD_TYPE="${BUILD_TYPE}" \ -e USE_TCPX="${USE_TCPX:-0}" \ + -e USE_EFA="${USE_EFA:-0}" \ + -e USE_IB="${USE_IB:-0}" \ -e MAKE_NORMAL_MODE="${MAKE_NORMAL_MODE:-}" \ -e FUNCTION_DEF="$(declare -f build_rccl_nccl_h build_rdma build_efa build_p2p build_ep build_eccl)" \ -w /io \ @@ -346,7 +392,15 @@ def initialize(): mv ${BACKUP_FN} setup.py fi - auditwheel repair dist/uccl-*.whl --exclude "libtorch*.so" --exclude "libc10*.so" --exclude "libibverbs.so.1" --exclude "libcudart.so.12" --exclude "libamdhip64.so.*" --exclude "libcuda.so.1" -w /io/${WHEEL_DIR} + auditwheel repair dist/uccl-*.whl \ + --exclude "libtorch*.so" \ + --exclude "libc10*.so" \ + --exclude "libibverbs.so.1" \ + --exclude "libcudart.so.12" \ + --exclude "libamdhip64.so.*" \ + --exclude "libcuda.so.1" \ + --exclude "libefa.so.1" \ + -w /io/${WHEEL_DIR} # Add backend tag to wheel filename using local version identifier if [[ "$TARGET" == rocm* || "$TARGET" == "therock" ]]; then @@ -378,4 +432,4 @@ def initialize(): # 3. Done echo "[3/3] Wheel built successfully (stored in ${WHEEL_DIR}):" -ls -lh "${WHEEL_DIR}"/uccl-*.whl || true +ls -lh "${WHEEL_DIR}"/uccl-*.whl || true \ No newline at end of file diff --git a/build_and_install.sh b/build_and_install.sh index bba4595d8..06dab29d8 100755 --- a/build_and_install.sh +++ b/build_and_install.sh @@ -24,3 +24,16 @@ else # That (currently) requires --extra-index-url pip install --extra-index-url ${ROCM_IDX_URL} $(ls wheelhouse-$TARGET/uccl-*.whl)[rocm] fi + +UCCL_INSTALL_PATH=$(pip show uccl 2>/dev/null | grep "^Location:" | cut -d' ' -f2 || echo "") +if [[ -n "$UCCL_INSTALL_PATH" && -d "$UCCL_INSTALL_PATH" ]]; then + UCCL_PACKAGE_PATH="$UCCL_INSTALL_PATH/uccl" + if [[ -d "$UCCL_PACKAGE_PATH" ]]; then + echo "UCCL installed at: $UCCL_PACKAGE_PATH" + echo "Set LIBRARY_PATH: export LIBRARY_PATH=\"$UCCL_PACKAGE_PATH/lib:\$LIBRARY_PATH\"" + else + echo "UCCL package directory not found at: $UCCL_PACKAGE_PATH" + fi +else + echo "Warning: Could not detect UCCL installation path" +fi diff --git a/docker/Dockerfile.rocm b/docker/Dockerfile.rocm index a7b12cb0b..e1ce36ac9 100644 --- a/docker/Dockerfile.rocm +++ b/docker/Dockerfile.rocm @@ -11,11 +11,11 @@ ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ apt-get install -y --no-install-recommends \ build-essential cmake git ninja-build g++ make patchelf \ - rdma-core libibverbs-dev \ + rdma-core libibverbs-dev libnuma-dev \ libgoogle-glog-dev libgflags-dev libgtest-dev libelf-dev \ - libnuma-dev libdrm-dev libdrm-amdgpu1 \ - pkg-config zlib1g-dev curl \ - software-properties-common && \ + pkg-config zlib1g-dev curl unzip \ + software-properties-common \ + hipcub && \ \ # ───── Add Python ${PY_VER} PPA & install Python ${PY_VER} + setuptools ───── add-apt-repository ppa:deadsnakes/ppa && \ diff --git a/ep/bench/run_ep.sh b/ep/bench/run_ep.sh new file mode 100755 index 000000000..2d973dd29 --- /dev/null +++ b/ep/bench/run_ep.sh @@ -0,0 +1,32 @@ +# !/bin/bash + +NNODES=${1:-2} +RANK=${2:-0} +MODE=${3:-ll} # ll, ht, ll-pressure, ht-pressure +MAIN_IP=${4:-10.1.18.53} + +export OMP_NUM_THREADS=6 +echo "Running nodes $NNODES, rank $RANK, mode $MODE, main IP $MAIN_IP" + +if [ "$MODE" = "ll" ]; then + torchrun --nnodes=$NNODES --nproc_per_node=8 --node_rank=$RANK \ + --master_addr=$MAIN_IP --master_port=12355 \ + test_low_latency.py --num-tokens=128 \ + --hidden=7168 --num-topk=8 --num-experts=288 --pressure-test-mode=2 +elif [ "$MODE" = "ht" ]; then + torchrun --nnodes=$NNODES --nproc_per_node=8 --node_rank=$RANK \ + --master_addr=$MAIN_IP --master_port=12355 \ + test_internode.py --num-tokens=4096 \ + --hidden=7168 --num-topk=8 --num-experts=288 --test-ll-compatibility +elif [ "$MODE" = "ll-pressure" ]; then + torchrun --nnodes=$NNODES --nproc_per_node=8 --node_rank=$RANK \ + --master_addr=$MAIN_IP --master_port=12355 \ + test_low_latency.py --num-tokens=128 \ + --hidden=7168 --num-topk=8 --num-experts=288 --pressure-test-mode=2 --debug-hash +elif [ "$MODE" = "ht-pressure" ]; then + torchrun --nnodes=$NNODES --nproc_per_node=8 --node_rank=$RANK \ + --master_addr=$MAIN_IP --master_port=12355 \ + test_internode.py --num-tokens=4096 \ + --hidden=7168 --num-topk=8 --num-experts=288 --test-ll-compatibility --pressure-test-mode=2 +fi +# --log-dir=logs --redirect=3 \ No newline at end of file diff --git a/ep/bench/test_low_latency.py b/ep/bench/test_low_latency.py index 7fe93ea5c..e8661c311 100644 --- a/ep/bench/test_low_latency.py +++ b/ep/bench/test_low_latency.py @@ -86,6 +86,8 @@ def test_main( buffer: Buffer, use_logfmt: bool = False, seed: int = 0, + skip_benchmark: bool = False, + debug_hash: bool = False, ): torch.manual_seed(seed + rank) random.seed(seed + rank) @@ -135,6 +137,20 @@ def test_main( # Check dispatch correctness do_check = True hash_value, num_times = 0, 0 + # Optional per-tensor hash breakdown to localize non-determinism. + hash_details = {} if debug_hash else None + + def _record_hash(label: str, t: torch.Tensor, include_in_overall: bool = True): + nonlocal hash_value + if not t.is_contiguous(): + t = t.contiguous() + hv = hash_tensor(t) + if include_in_overall: + hash_value ^= hv + if hash_details is not None: + # Preserve the XOR aggregation behavior at per-label granularity. + hash_details[label] = hash_details.get(label, 0) ^ hv + for current_x in x_list: for return_recv_hook in (False, True): for dispatch_use_fp8 in (False, True): @@ -270,16 +286,59 @@ def test_main( + rank_offset ).sum().item() == 0 if dispatch_use_fp8: - hash_value ^= hash_tensor( - packed_recv_x[0][i, :num_valid_tokens] + tag = ( + f"x={'x' if current_x is x else 'rand'}" + f"|hook={return_recv_hook}" + f"|fp8={dispatch_use_fp8}" + f"|rs={round_scale}" + f"|ue={use_ue8m0}" + f"|le={i}" + f"|nvt={num_valid_tokens}" + ) + _record_hash( + f"dispatch_fp8_data|{tag}", + packed_recv_x[0][i, :num_valid_tokens], ) - hash_value ^= hash_tensor( - packed_recv_x[1][i, :num_valid_tokens] + _record_hash( + f"dispatch_fp8_scale|{tag}", + packed_recv_x[1][i, :num_valid_tokens], ) else: - hash_value ^= hash_tensor( - packed_recv_x[i, :num_valid_tokens] + tag = ( + f"x={'x' if current_x is x else 'rand'}" + f"|hook={return_recv_hook}" + f"|fp8={dispatch_use_fp8}" + f"|rs={round_scale}" + f"|ue={use_ue8m0}" + f"|le={i}" + f"|nvt={num_valid_tokens}" ) + _record_hash( + f"dispatch_bf16|{tag}", + packed_recv_x[i, :num_valid_tokens], + ) + # Also record metadata that defines token placement/order, + # but do NOT include it in the overall hash_value (so we + # don't change the existing determinism criterion). + # + # If these differ while data differs, it's "assignment/order" + # non-determinism. If these match but data differs, it's + # "payload/visibility/cast" non-determinism. + _record_hash( + f"dispatch_meta_count|{tag}", + packed_recv_count[i], + include_in_overall=False, + ) + _record_hash( + f"dispatch_meta_src_info|{tag}", + recv_src_info[:num_valid_tokens], + include_in_overall=False, + ) + _record_hash( + f"dispatch_meta_layout_range|{tag}", + recv_layout_range, + include_in_overall=False, + ) # Check combine correctness for zero_copy in (False,) if use_logfmt else (False, True): if zero_copy: @@ -319,7 +378,16 @@ def test_main( assert diff < ( 9e-4 if dispatch_use_fp8 else 1e-5 ), f"Error: {diff=}, {dispatch_use_fp8=}, {zero_copy=}" - hash_value ^= hash_tensor(combined_x) + tag = ( + f"x={'x' if current_x is x else 'rand'}" + f"|hook={return_recv_hook}" + f"|fp8={dispatch_use_fp8}" + f"|rs={round_scale}" + f"|ue={use_ue8m0}" + f"|zc={zero_copy}" + f"|logfmt={use_logfmt}" + ) + _record_hash(f"combine_out|{tag}", combined_x) # noinspection PyShadowingNames def large_gemm_with_hook(hook): @@ -353,6 +421,10 @@ def test_func(return_recv_hook: bool): dist.barrier(group=group) print("✓ All correctness tests passed!", flush=True) + + if skip_benchmark: + return (hash_value, hash_details) if debug_hash else hash_value + # Calculate bandwidth num_fp8_bytes, num_bf16_bytes = (hidden + hidden / 128 * 4 + 16), hidden * 2 num_logfmt10_bytes = hidden * 10 / 8 + hidden / 128 * 4 @@ -393,7 +465,7 @@ def test_func(return_recv_hook: bool): f"Combine send/recv time: {combine_t[0] * 1e6:.2f} + {combine_t[1] * 1e6:.2f} us", flush=True, ) - return hash_value + return (hash_value, hash_details) if debug_hash else hash_value # noinspection PyUnboundLocalVariable,PyShadowingNames @@ -414,24 +486,12 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace): explicitly_destroy=True, allow_mnnvl=args.allow_mnnvl, ) - test_main( - num_tokens, - hidden, - num_experts, - num_topk, - rank, - num_ranks, - group, - buffer, - use_logfmt=args.use_logfmt, - seed=1, - ) - do_pressure_test = args.pressure_test - for seed in range(int(1e9) if do_pressure_test else 0): + for seed in range(int(1e9)): if local_rank == 0: print(f"Testing with seed {seed} ...", flush=True) - ref_hash = test_main( + torch.manual_seed(rank + seed) + ref_out = test_main( num_tokens, hidden, num_experts, @@ -442,23 +502,72 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace): buffer, use_logfmt=args.use_logfmt, seed=seed, + skip_benchmark=args.pressure_test_mode == 1, + debug_hash=args.debug_hash, ) - for i in range(20): - assert ( - test_main( - num_tokens, - hidden, - num_experts, - num_topk, - rank, - num_ranks, - group, - buffer, - use_logfmt=args.use_logfmt, - seed=seed, + if args.debug_hash: + ref_hash, ref_hash_details = ref_out + else: + ref_hash, ref_hash_details = ref_out, None + if args.pressure_test_mode == 0: + break + + if local_rank == 0: + print(f"{ref_hash=}") + print("", flush=True) + + for _ in range(20): + torch.manual_seed(rank + seed) + cur_out = test_main( + num_tokens, + hidden, + num_experts, + num_topk, + rank, + num_ranks, + group, + buffer, + use_logfmt=args.use_logfmt, + seed=seed, + skip_benchmark=args.pressure_test_mode == 1, + debug_hash=args.debug_hash, + ) + if args.debug_hash: + current_hash, current_hash_details = cur_out + else: + current_hash, current_hash_details = cur_out, None + + if current_hash != ref_hash: + print( + f"[rank {rank} local_rank {local_rank}] NON-DETERMINISM: " + f"seed={seed} current_hash={current_hash} ref_hash={ref_hash}", + flush=True, ) - == ref_hash - ), f"Error: seed={seed}" + if args.debug_hash and ref_hash_details and current_hash_details: + diffs = [] + keys = set(ref_hash_details.keys()) | set(current_hash_details.keys()) + for k in sorted(keys): + a = ref_hash_details.get(k, 0) + b = current_hash_details.get(k, 0) + if a != b: + diffs.append((k, a, b)) + if diffs: + k0, a0, b0 = diffs[0] + print( + f"[rank {rank}] First differing tensor: {k0}\n" + f" ref={a0} cur={b0}\n" + f"[rank {rank}] Total differing labels: {len(diffs)}", + flush=True, + ) + for (k, a, b) in diffs[:10]: + print(f"[rank {rank}] DIFF {k} ref={a} cur={b}", flush=True) + else: + print( + f"[rank {rank}] Hash differs but no per-label diffs " + f"(possible XOR collision).", + flush=True, + ) + # assert current_hash == ref_hash, f"Error: seed={seed}" # Destroy the buffer runtime and communication group buffer.destroy() @@ -498,7 +607,15 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace): "--use-logfmt", action="store_true", help="Whether to test LogFMT combine" ) parser.add_argument( - "--pressure-test", action="store_true", help="Whether to do pressure test" + "--pressure-test-mode", + type=int, + default=0, + help="Pressure test mode. 0: don't do pressure test, 1: do pressure test without benchmarks, 2: do pressure test with benchmarks", + ) + parser.add_argument( + "--debug-hash", + action="store_true", + help="Print per-tensor hash breakdown when non-determinism is detected.", ) args = parser.parse_args() @@ -506,4 +623,4 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace): # NOTE: modified from deep_ep local_rank = int(os.environ["LOCAL_RANK"]) num_local_ranks = int(os.environ.get("LOCAL_WORLD_SIZE", 1)) - test_loop(local_rank, num_local_ranks, args) + test_loop(local_rank, num_local_ranks, args) \ No newline at end of file diff --git a/ep/bench/utils.py b/ep/bench/utils.py index c664973fe..4a1bac830 100644 --- a/ep/bench/utils.py +++ b/ep/bench/utils.py @@ -40,7 +40,14 @@ def calc_diff(x: torch.Tensor, y: torch.Tensor): def hash_tensor(t: torch.Tensor): - return t.view(torch.int64).sum().item() + # Robust hash that works for any dtype/shape (including 0-d scalars). + # We hash raw bytes to avoid dtype-size assumptions (e.g., int32 -> int64). + if not t.is_contiguous(): + t = t.contiguous() + if t.dim() == 0: + t = t.reshape(1) + u8 = t.view(torch.uint8) + return int(u8.sum(dtype=torch.int64).item()) def init_dist(local_rank: int, num_local_ranks: int): diff --git a/ep/include/ep_utils.cuh b/ep/include/ep_utils.cuh index 10c40d276..aed9e4e1c 100644 --- a/ep/include/ep_utils.cuh +++ b/ep/include/ep_utils.cuh @@ -785,8 +785,8 @@ __forceinline__ __device__ int atomic_cas_cta_acquire(int* addr, int x, int y) { #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) // TODO: __hip_atomic_compare_exchange_strong or // __hip_atomic_compare_exchange_weak - __hip_atomic_compare_exchange_strong(addr, &x, y, __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED, + __hip_atomic_compare_exchange_strong(addr, &x, y, __ATOMIC_ACQ_REL, + __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP); return x; #else diff --git a/ep/src/internode_ll.cu b/ep/src/internode_ll.cu index 39ae1d610..3358734cf 100644 --- a/ep/src/internode_ll.cu +++ b/ep/src/internode_ll.cu @@ -210,6 +210,7 @@ __global__ __launch_bounds__(1024, 1) void dispatch( dst_rank, max_nvl_peers, 0) : 0; if (dst_p2p_ptr == 0) { + // NOTE(Ziming): the original deepEP even doesn't have this. __threadfence_system(); uccl::nvshmemi_ibgda_put_nbi_warp( dst_ptr - reinterpret_cast(rdma_buffer_ptr),