Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add meaningful error for out of disk exception during write #8886

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
from distributed.protocol.serialize import ToPickle
from distributed.shuffle._comms import CommShardsBuffer
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError
from distributed.shuffle._exceptions import (
P2PConsistencyError,
P2POutOfDiskError,
ShuffleClosedError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._memory import MemoryShardsBuffer
from distributed.utils import run_in_executor_with_context, sync
Expand Down Expand Up @@ -508,6 +512,8 @@ def handle_transfer_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e

Expand All @@ -522,6 +528,8 @@ def handle_unpack_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during unpack phase") from e

Expand Down
18 changes: 14 additions & 4 deletions distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import errno
import pathlib
import shutil
import threading
Expand All @@ -12,7 +13,7 @@

from distributed.metrics import context_meter, thread_time
from distributed.shuffle._buffer import ShardsBuffer
from distributed.shuffle._exceptions import DataUnavailable
from distributed.shuffle._exceptions import DataUnavailable, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import pickle_bytelist
from distributed.utils import Deadline, empty_context, log_errors, nbytes
Expand Down Expand Up @@ -177,12 +178,21 @@ async def _process(self, id: str, shards: list[Any]) -> None:
if self._closed:
raise RuntimeError("Already closed")

with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

try:
self._write_frames(frames, id)
except OSError as e:
if e.errno == errno.ENOSPC:
raise P2POutOfDiskError from e
raise
context_meter.digest_metric("disk-write", 1, "count")
context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes")

def _write_frames(
self, frames: Iterable[bytes | bytearray | memoryview], id: str
) -> None:
with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

def read(self, id: str) -> Any:
"""Read a complete file back into memory"""
self.raise_on_exception()
Expand Down
9 changes: 9 additions & 0 deletions distributed/shuffle/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ class ShuffleClosedError(P2PConsistencyError):

class DataUnavailable(Exception):
"""Raised when data is not available in the buffer"""


class P2POutOfDiskError(OSError):
def __str__(self) -> str:
return (
"P2P ran out of available disk space while temporarily storing transferred data. "
"Please make sure that P2P has enough disk space available by increasing the number of "
"workers or the size of the attached disk."
)
8 changes: 7 additions & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
handle_transfer_errors,
handle_unpack_errors,
)
from distributed.shuffle._exceptions import DataUnavailable, P2PConsistencyError
from distributed.shuffle._exceptions import (
DataUnavailable,
P2PConsistencyError,
P2POutOfDiskError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
from distributed.sizeof import sizeof
Expand Down Expand Up @@ -107,6 +111,8 @@ def shuffle_barrier(id: ShuffleId, run_ids: list[int]) -> int:
raise e
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e

Expand Down
33 changes: 32 additions & 1 deletion distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import contextlib
import errno
import itertools
import logging
import os
Expand All @@ -21,6 +22,7 @@
from dask.utils import key_split

from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.worker import Status

np = pytest.importorskip("numpy")
Expand All @@ -47,7 +49,7 @@
read_from_disk,
serialize_table,
)
from distributed.shuffle._exceptions import P2PConsistencyError
from distributed.shuffle._exceptions import P2PConsistencyError, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
from distributed.shuffle._shuffle import (
Expand Down Expand Up @@ -2039,6 +2041,35 @@ async def _receive(self, data: list[tuple[int, bytes]]) -> None:
await asyncio.gather(*[s.close() for s in [sA, sB]])


@gen_cluster(client=True)
async def test_meaningful_out_of_disk_error(c, s, a, b):
class OutOfDiskShardsBuffer(DiskShardsBuffer):
def _write_frames(self, frames, id):
code = errno.ENOSPC
raise OSError(code, os.strerror(code))

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True}
):
shuffled = df.shuffle("x", npartitions=10)
with pytest.raises(P2POutOfDiskError, match="out of available disk space"):
with mock.patch(
"distributed.shuffle._core.DiskShardsBuffer",
OutOfDiskShardsBuffer,
):
await c.compute(shuffled)
await assert_worker_cleanup(a)
await assert_worker_cleanup(b)
await c.close()
await assert_scheduler_cleanup(s)


class BlockedShuffleReceiveShuffleWorkerPlugin(ShuffleWorkerPlugin):
def setup(self, worker: Worker) -> None:
super().setup(worker)
Expand Down
Loading