diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index 4eaea179df..6de5ee0ffb 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -36,7 +36,11 @@ from distributed.protocol.serialize import ToPickle from distributed.shuffle._comms import CommShardsBuffer from distributed.shuffle._disk import DiskShardsBuffer -from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError +from distributed.shuffle._exceptions import ( + P2PConsistencyError, + P2POutOfDiskError, + ShuffleClosedError, +) from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._memory import MemoryShardsBuffer from distributed.utils import run_in_executor_with_context, sync @@ -508,6 +512,8 @@ def handle_transfer_errors(id: ShuffleId) -> Iterator[None]: raise Reschedule() except P2PConsistencyError: raise + except P2POutOfDiskError: + raise except Exception as e: raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e @@ -522,6 +528,8 @@ def handle_unpack_errors(id: ShuffleId) -> Iterator[None]: raise Reschedule() except P2PConsistencyError: raise + except P2POutOfDiskError: + raise except Exception as e: raise RuntimeError(f"P2P shuffling {id} failed during unpack phase") from e diff --git a/distributed/shuffle/_disk.py b/distributed/shuffle/_disk.py index bed442705c..4dd05de1b6 100644 --- a/distributed/shuffle/_disk.py +++ b/distributed/shuffle/_disk.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import errno import pathlib import shutil import threading @@ -12,7 +13,7 @@ from distributed.metrics import context_meter, thread_time from distributed.shuffle._buffer import ShardsBuffer -from distributed.shuffle._exceptions import DataUnavailable +from distributed.shuffle._exceptions import DataUnavailable, P2POutOfDiskError from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._pickle import pickle_bytelist from distributed.utils import Deadline, empty_context, log_errors, nbytes @@ -177,12 +178,21 @@ async def _process(self, id: str, shards: list[Any]) -> None: if self._closed: raise RuntimeError("Already closed") - with open(self.directory / str(id), mode="ab") as f: - f.writelines(frames) - + try: + self._write_frames(frames, id) + except OSError as e: + if e.errno == errno.ENOSPC: + raise P2POutOfDiskError from e + raise context_meter.digest_metric("disk-write", 1, "count") context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes") + def _write_frames( + self, frames: Iterable[bytes | bytearray | memoryview], id: str + ) -> None: + with open(self.directory / str(id), mode="ab") as f: + f.writelines(frames) + def read(self, id: str) -> Any: """Read a complete file back into memory""" self.raise_on_exception() diff --git a/distributed/shuffle/_exceptions.py b/distributed/shuffle/_exceptions.py index 27bdb8dd56..01dba17363 100644 --- a/distributed/shuffle/_exceptions.py +++ b/distributed/shuffle/_exceptions.py @@ -15,3 +15,12 @@ class ShuffleClosedError(P2PConsistencyError): class DataUnavailable(Exception): """Raised when data is not available in the buffer""" + + +class P2POutOfDiskError(OSError): + def __str__(self) -> str: + return ( + "P2P ran out of available disk space while temporarily storing transferred data. " + "Please make sure that P2P has enough disk space available by increasing the number of " + "workers or the size of the attached disk." + ) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 154f25c4ba..27aa33d8be 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -48,7 +48,11 @@ handle_transfer_errors, handle_unpack_errors, ) -from distributed.shuffle._exceptions import DataUnavailable, P2PConsistencyError +from distributed.shuffle._exceptions import ( + DataUnavailable, + P2PConsistencyError, + P2POutOfDiskError, +) from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin from distributed.sizeof import sizeof @@ -107,6 +111,8 @@ def shuffle_barrier(id: ShuffleId, run_ids: list[int]) -> int: raise e except P2PConsistencyError: raise + except P2POutOfDiskError: + raise except Exception as e: raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 23e61eef2d..7d44bf6921 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2,6 +2,7 @@ import asyncio import contextlib +import errno import itertools import logging import os @@ -21,6 +22,7 @@ from dask.utils import key_split from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key +from distributed.shuffle._disk import DiskShardsBuffer from distributed.worker import Status np = pytest.importorskip("numpy") @@ -47,7 +49,7 @@ read_from_disk, serialize_table, ) -from distributed.shuffle._exceptions import P2PConsistencyError +from distributed.shuffle._exceptions import P2PConsistencyError, P2POutOfDiskError from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._shuffle import ( @@ -2039,6 +2041,35 @@ async def _receive(self, data: list[tuple[int, bytes]]) -> None: await asyncio.gather(*[s.close() for s in [sA, sB]]) +@gen_cluster(client=True) +async def test_meaningful_out_of_disk_error(c, s, a, b): + class OutOfDiskShardsBuffer(DiskShardsBuffer): + def _write_frames(self, frames, id): + code = errno.ENOSPC + raise OSError(code, os.strerror(code)) + + df = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-10", + dtypes={"x": float, "y": float}, + freq="10 s", + ) + with dask.config.set( + {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True} + ): + shuffled = df.shuffle("x", npartitions=10) + with pytest.raises(P2POutOfDiskError, match="out of available disk space"): + with mock.patch( + "distributed.shuffle._core.DiskShardsBuffer", + OutOfDiskShardsBuffer, + ): + await c.compute(shuffled) + await assert_worker_cleanup(a) + await assert_worker_cleanup(b) + await c.close() + await assert_scheduler_cleanup(s) + + class BlockedShuffleReceiveShuffleWorkerPlugin(ShuffleWorkerPlugin): def setup(self, worker: Worker) -> None: super().setup(worker)