From ce97cce54a9bca5c19e1e5028eea2656344e903f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 8 Jan 2024 14:22:31 +0100 Subject: [PATCH 1/4] Configuration --- distributed/config.py | 1 + distributed/distributed-schema.yaml | 48 +++++++++++++++++-- distributed/distributed.yaml | 8 +++- distributed/shuffle/_buffer.py | 1 + distributed/shuffle/_comms.py | 9 ++-- distributed/shuffle/_core.py | 11 ++++- distributed/shuffle/_rechunk.py | 2 +- distributed/shuffle/_shuffle.py | 2 +- distributed/shuffle/_worker_plugin.py | 12 +++-- distributed/shuffle/tests/test_comm_buffer.py | 25 ++++++++-- distributed/shuffle/tests/test_merge.py | 2 +- distributed/shuffle/tests/test_rechunk.py | 6 +-- distributed/shuffle/tests/test_shuffle.py | 2 +- 13 files changed, 101 insertions(+), 28 deletions(-) diff --git a/distributed/config.py b/distributed/config.py index b6e5f3d0d3..84de644c70 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -57,6 +57,7 @@ "distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length", "recent-messages-log-length": "distributed.admin.low-level-log-length", "distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length", + "distributed.p2p.disk": "distributed.p2p.storage.disk", } # Affects yaml and env variables configs, as well as calls to dask.config.set() diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index fffb9b3772..1f47416f39 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -1034,6 +1034,28 @@ properties: description: Configuration settings for Dask communications specific to P2P properties: + buffer: + type: + - string + - integer + description: | + The maximum amount of data for P2P's comm buffers to buffer in-memory per worker. + This limit is not absolute but used to apply back pressure. + concurrency: + type: integer + description: Number of concurrent background tasks used for IO-intensive operations in per P2P comm buffer. + message-bytes-limit: + type: + - string + - integer + description: | + The maximum amount of data for P2P to send to another worker in a single operation + + Data is sent in batches, and if the first shard is larger than this value, + the task shard still be sent to ensure progress. Hence, this limit is not absolute. + Note that this limit applies to a single send operation and a worker may send data to + multiple workers in parallel. + retry: type: object description: | @@ -1055,10 +1077,28 @@ properties: max: type: string description: The maximum delay between retries - disk: - type: boolean - description: | - Whether or not P2P stores intermediate data on disk instead of memory + + storage: + type: object + description: Configuration settings for P2P storage + properties: + + buffer: + type: + - string + - integer + description: | + The maximum amount of data for P2P's disk buffers to buffer in-memory per worker + This limit is not absolute but used to apply back pressure. + disk: + type: boolean + description: | + Whether or not P2P stores intermediate data on disk instead of memory + threads: + type: + - integer + - "null" + description: Number of threads used for CPU-intensive operations per worker. Defaults to number of worker threads. dashboard: type: object diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 8f30c69363..be9ba7eb69 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -299,12 +299,18 @@ distributed: p2p: comm: + buffer: 1 GiB + concurrency: 10 + message-bytes-limit: 2 MiB retry: count: 10 delay: min: 1s # the first non-zero delay between re-tries max: 30s # the maximum delay between re-tries - disk: True + storage: + buffer: 100 MiB + disk: True + threads: null ################### # Bokeh dashboard # diff --git a/distributed/shuffle/_buffer.py b/distributed/shuffle/_buffer.py index 60cc0a86c4..6c8dfa79c2 100644 --- a/distributed/shuffle/_buffer.py +++ b/distributed/shuffle/_buffer.py @@ -150,6 +150,7 @@ def _continue() -> bool: if self.max_message_size > 0: size = 0 shards = [] + # FIXME: We always exceed the limit, not just on the first shard. while size < self.max_message_size: try: shard = self.shards[part_id].pop() diff --git a/distributed/shuffle/_comms.py b/distributed/shuffle/_comms.py index 50094afddd..359d9ad13a 100644 --- a/distributed/shuffle/_comms.py +++ b/distributed/shuffle/_comms.py @@ -3,8 +3,6 @@ from collections.abc import Awaitable, Callable from typing import Any -from dask.utils import parse_bytes - from distributed.metrics import context_meter from distributed.shuffle._disk import ShardsBuffer from distributed.shuffle._limiter import ResourceLimiter @@ -49,18 +47,17 @@ class CommShardsBuffer(ShardsBuffer): Number of background tasks to run. """ - max_message_size = parse_bytes("2 MiB") - def __init__( self, send: Callable[[str, list[tuple[Any, Any]]], Awaitable[None]], memory_limiter: ResourceLimiter, - concurrency_limit: int = 10, + max_message_size: int, + concurrency_limit: int, ): super().__init__( memory_limiter=memory_limiter, concurrency_limit=concurrency_limit, - max_message_size=CommShardsBuffer.max_message_size, + max_message_size=max_message_size, ) self.send = send diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index dfb4e8853c..6d223c6862 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -19,7 +19,7 @@ import dask.config from dask.core import flatten from dask.typing import Key -from dask.utils import parse_timedelta +from dask.utils import parse_bytes, parse_timedelta from distributed.core import PooledRPCCall from distributed.exceptions import Reschedule @@ -115,8 +115,15 @@ def __init__( self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize) with self._capture_metrics("background-comms"): + max_message_size = parse_bytes( + dask.config.get("distributed.p2p.comm.message-bytes-limit") + ) + concurrency_limit = dask.config.get("distributed.p2p.comm.concurrency") self._comm_buffer = CommShardsBuffer( - send=self.send, memory_limiter=memory_limiter_comms + send=self.send, + max_message_size=max_message_size, + memory_limiter=memory_limiter_comms, + concurrency_limit=concurrency_limit, ) # TODO: reduce number of connections to number of workers diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 6bb6e3f069..0e910dce1d 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -393,7 +393,7 @@ def partial_rechunk( _barrier_key = barrier_key(ShuffleId(partial_token)) slice_name = f"rechunk-slice-{partial_token}" transfer_name = f"rechunk-transfer-{partial_token}" - disk: bool = dask.config.get("distributed.p2p.disk") + disk: bool = dask.config.get("distributed.p2p.storage.disk") ndim = len(x.shape) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 7d0247336e..fa367fa09d 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -131,7 +131,7 @@ def rearrange_by_column_p2p( ) name = f"shuffle_p2p-{token}" - disk: bool = dask.config.get("distributed.p2p.disk") + disk: bool = dask.config.get("distributed.p2p.storage.disk") layer = P2PShuffleLayer( name, diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index c32fca4b71..078cd34b30 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any, overload +import dask from dask.context import thread_state from dask.typing import Key from dask.utils import parse_bytes @@ -279,14 +280,19 @@ def setup(self, worker: Worker) -> None: # Initialize self.worker = worker self.shuffle_runs = _ShuffleRunManager(self) + comm_limit = parse_bytes(dask.config.get("distributed.p2p.comm.buffer")) self.memory_limiter_comms = ResourceLimiter( - parse_bytes("100 MiB"), metrics_label="p2p-comms-limiter" + comm_limit, metrics_label="p2p-comms-limiter" ) + storage_limit = parse_bytes(dask.config.get("distributed.p2p.storage.buffer")) self.memory_limiter_disk = ResourceLimiter( - parse_bytes("1 GiB"), metrics_label="p2p-disk-limiter" + storage_limit, metrics_label="p2p-disk-limiter" ) self.closed = False - self._executor = ThreadPoolExecutor(self.worker.state.nthreads) + nthreads = ( + dask.config.get("distributed.p2p.threads") or self.worker.state.nthreads + ) + self._executor = ThreadPoolExecutor(nthreads) def __str__(self) -> str: return f"ShuffleWorkerPlugin on {self.worker.address}" diff --git a/distributed/shuffle/tests/test_comm_buffer.py b/distributed/shuffle/tests/test_comm_buffer.py index 36896c547d..73b6758868 100644 --- a/distributed/shuffle/tests/test_comm_buffer.py +++ b/distributed/shuffle/tests/test_comm_buffer.py @@ -20,7 +20,11 @@ async def test_basic(tmp_path): async def send(address, shards): d[address].extend(shards) - mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None)) + mc = CommShardsBuffer( + send=send, + max_message_size=parse_bytes("2 MiB"), + memory_limiter=ResourceLimiter(None), + ) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) @@ -37,7 +41,11 @@ async def test_exceptions(tmp_path): async def send(address, shards): raise Exception(123) - mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None)) + mc = CommShardsBuffer( + send=send, + max_message_size=parse_bytes("2 MiB"), + memory_limiter=ResourceLimiter(None), + ) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) while not mc._exception: @@ -64,7 +72,10 @@ async def send(address, shards): sending_first.set() mc = CommShardsBuffer( - send=send, concurrency_limit=1, memory_limiter=ResourceLimiter(None) + send=send, + max_message_size=parse_bytes("2 MiB"), + concurrency_limit=1, + memory_limiter=ResourceLimiter(None), ) await mc.write({"x": b"0", "y": b"1"}) await mc.write({"x": b"0", "y": b"1"}) @@ -95,7 +106,9 @@ async def send(address, shards): nshards = 10 nputs = 20 comm_buffer = CommShardsBuffer( - send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB")) + send=send, + max_message_size=parse_bytes("2 MiB"), + memory_limiter=ResourceLimiter(parse_bytes("100 MiB")), ) payload = { x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) @@ -136,7 +149,9 @@ async def send(address, shards): nshards = 10 nputs = 20 comm_buffer = CommShardsBuffer( - send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB")) + send=send, + max_message_size=parse_bytes("2 MiB"), + memory_limiter=ResourceLimiter(parse_bytes("100 MiB")), ) payload = { x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index d74b553698..6fa2117447 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -180,7 +180,7 @@ async def test_merge(c, s, a, b, how, disk): b = dd.repartition(B, [0, 2, 5]) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - with dask.config.set({"distributed.p2p.disk": disk}): + with dask.config.set({"distributed.p2p.storage.disk": disk}): joined = dd.merge(a, b, left_index=True, right_index=True, how=how) res = await c.compute(joined) assert_eq( diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index 4744dfa6dc..661a9f418b 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -274,7 +274,7 @@ async def test_rechunk_2d(c, s, *ws, disk): a = np.random.default_rng().uniform(0, 1, 300).reshape((10, 30)) x = da.from_array(a, chunks=((1, 2, 3, 4), (5,) * 6)) new = ((5, 5), (15,) * 2) - with dask.config.set({"distributed.p2p.disk": disk}): + with dask.config.set({"distributed.p2p.storage.disk": disk}): x2 = rechunk(x, chunks=new, method="p2p") assert x2.chunks == new assert np.all(await c.compute(x2) == a) @@ -293,7 +293,7 @@ async def test_rechunk_4d(c, s, *ws, disk): a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) x = da.from_array(a, chunks=old) new = ((10,),) * 4 - with dask.config.set({"distributed.p2p.disk": disk}): + with dask.config.set({"distributed.p2p.storage.disk": disk}): x2 = rechunk(x, chunks=new, method="p2p") assert x2.chunks == new await c.compute(x2) @@ -1200,7 +1200,7 @@ async def test_preserve_writeable_flag(c, s, a, b): assert out.tolist() == [True, True] -@gen_cluster(client=True, config={"distributed.p2p.disk": False}) +@gen_cluster(client=True, config={"distributed.p2p.storage.disk": False}) async def test_rechunk_in_memory_shards_dont_share_buffer(c, s, a, b): """Test that, if two shards are sent in the same RPC call and they contribute to different output chunks, downstream tasks don't need to consume all output chunks in diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 0d557c8801..7785c4d2e6 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -195,7 +195,7 @@ async def test_basic_integration(c, s, a, b, npartitions, disk): freq="10 s", ) with dask.config.set( - {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + {"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk} ): shuffled = dd.shuffle.shuffle(df, "x", npartitions=npartitions) if npartitions is None: From c97e40539d6855e478fe0a34472a761bfbaacd2e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 16 Oct 2024 15:41:06 +0200 Subject: [PATCH 2/4] Fix comm buffer tests --- distributed/shuffle/tests/test_comm_buffer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/shuffle/tests/test_comm_buffer.py b/distributed/shuffle/tests/test_comm_buffer.py index 1c994f1702..70bc8e6daf 100644 --- a/distributed/shuffle/tests/test_comm_buffer.py +++ b/distributed/shuffle/tests/test_comm_buffer.py @@ -24,6 +24,7 @@ async def send(address, shards): send=send, max_message_size=parse_bytes("2 MiB"), memory_limiter=ResourceLimiter(None), + concurrency_limit=10, ) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) @@ -45,6 +46,7 @@ async def send(address, shards): send=send, max_message_size=parse_bytes("2 MiB"), memory_limiter=ResourceLimiter(None), + concurrency_limit=10, ) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) @@ -110,6 +112,7 @@ async def send(address, shards): send=send, max_message_size=parse_bytes("2 MiB"), memory_limiter=ResourceLimiter(parse_bytes("100 MiB")), + concurrency_limit=10, ) payload = { x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) @@ -154,6 +157,7 @@ async def send(address, shards): send=send, max_message_size=parse_bytes("2 MiB"), memory_limiter=ResourceLimiter(parse_bytes("100 MiB")), + concurrency_limit=10, ) payload = { x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) From 84adcb39f6a21b0124faef05be5ca54799921cdf Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 16 Oct 2024 15:43:01 +0200 Subject: [PATCH 3/4] Fix merge --- distributed/shuffle/_rechunk.py | 2 +- distributed/shuffle/tests/test_shuffle.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 15e3ea1d78..f62b375c0c 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -237,7 +237,7 @@ def rechunk_p2p( token = tokenize(x, chunks) name = rechunk_name(token) - disk: bool = dask.config.get("distributed.p2p.disk") + disk: bool = dask.config.get("distributed.p2p.storage.disk") layer = P2PRechunkLayer( name=name, diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 4d7cb41224..1856d87f8d 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2055,7 +2055,7 @@ def _write_frames(self, frames, id): freq="10 s", ) with dask.config.set( - {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True} + {"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": True} ): shuffled = df.shuffle("x", npartitions=10) with pytest.raises(P2POutOfDiskError, match="out of available disk space"): @@ -2861,7 +2861,7 @@ def make_partition(partition_id, size): df = dd.from_map(make_partition, np.arange(19), args=(250,)) with dask.config.set( - {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + {"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk} ): shuffled = df.shuffle("b") result, expected = await c.compute([shuffled, df], sync=True) @@ -2882,7 +2882,7 @@ async def test_drop_duplicates_stable_ordering(c, s, a, b, keep, disk): df = dask.datasets.timeseries() with dask.config.set( - {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + {"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk} ): result, expected = await c.compute( [ From 1ac1de9b184888c277bf3aa32632f06a32665cb5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 16 Oct 2024 15:44:21 +0200 Subject: [PATCH 4/4] Minor --- distributed/shuffle/_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index 0fb403a3f8..345fd3c6b8 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -108,7 +108,7 @@ def hash_join_p2p( lhs = _calculate_partitions(lhs, left_on, npartitions) rhs = _calculate_partitions(rhs, right_on, npartitions) merge_name = "hash-join-" + tokenize(lhs, rhs, **merge_kwargs) - disk: bool = dask.config.get("distributed.p2p.disk") + disk: bool = dask.config.get("distributed.p2p.storage.disk") join_layer = HashJoinP2PLayer( name=merge_name, name_input_left=lhs._name,