Skip to content

Commit 8d42f27

Browse files
authored
Merge pull request #399 from jakirkham/add_map_overlap_bench
Add benchmark for `da.map_overlap`
2 parents 2fa52fe + 537f551 commit 8d42f27

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import asyncio
2+
from collections import defaultdict
3+
from time import perf_counter as clock
4+
5+
import cupy as cp
6+
import numpy as np
7+
from cupyx.scipy.ndimage.filters import convolve as cp_convolve
8+
from scipy.ndimage import convolve as sp_convolve
9+
10+
from dask import array as da
11+
from dask.distributed import Client, performance_report, wait
12+
from dask.utils import format_bytes, format_time, parse_bytes
13+
14+
from dask_cuda.benchmarks.utils import (
15+
get_cluster_options,
16+
get_scheduler_workers,
17+
parse_benchmark_args,
18+
setup_memory_pool,
19+
)
20+
21+
22+
def mean_filter(a, shape):
23+
a_k = np.full_like(a, 1.0 / np.prod(shape), shape=shape)
24+
if isinstance(a, cp.ndarray):
25+
return cp_convolve(a, a_k)
26+
else:
27+
return sp_convolve(a, a_k)
28+
29+
30+
async def _run(client, args):
31+
# Create a simple random array
32+
if args.type == "gpu":
33+
rs = da.random.RandomState(RandomState=cp.random.RandomState)
34+
else:
35+
rs = da.random.RandomState(RandomState=np.random.RandomState)
36+
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
37+
ks = 2 * (2 * args.kernel_size + 1,)
38+
await wait(x)
39+
40+
# Execute the operations to benchmark
41+
if args.profile is not None:
42+
async with performance_report(filename=args.profile):
43+
t1 = clock()
44+
await client.compute(x.map_overlap(mean_filter, args.kernel_size, shape=ks))
45+
took = clock() - t1
46+
else:
47+
t1 = clock()
48+
await client.compute(x.map_overlap(mean_filter, args.kernel_size, shape=ks))
49+
took = clock() - t1
50+
51+
return (took, x.npartitions)
52+
53+
54+
async def run(args):
55+
cluster_options = get_cluster_options(args)
56+
Cluster = cluster_options["class"]
57+
cluster_args = cluster_options["args"]
58+
cluster_kwargs = cluster_options["kwargs"]
59+
scheduler_addr = cluster_options["scheduler_addr"]
60+
61+
async with Cluster(*cluster_args, **cluster_kwargs, asynchronous=True) as cluster:
62+
if args.multi_node:
63+
import time
64+
65+
# Allow some time for workers to start and connect to scheduler
66+
# TODO: make this a command-line argument?
67+
time.sleep(15)
68+
69+
# Use the scheduler address with an SSHCluster rather than the cluster
70+
# object, otherwise we can't shut it down.
71+
async with Client(
72+
scheduler_addr if args.multi_node else cluster, asynchronous=True
73+
) as client:
74+
scheduler_workers = await client.run_on_scheduler(get_scheduler_workers)
75+
76+
await client.run(setup_memory_pool, disable_pool=args.no_rmm_pool)
77+
# Create an RMM pool on the scheduler due to occasional deserialization
78+
# of CUDA objects. May cause issues with InfiniBand otherwise.
79+
await client.run_on_scheduler(
80+
setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool
81+
)
82+
83+
took_list = []
84+
for i in range(args.runs):
85+
took_list.append(await _run(client, args))
86+
87+
# Collect, aggregate, and print peer-to-peer bandwidths
88+
incoming_logs = await client.run(
89+
lambda dask_worker: dask_worker.incoming_transfer_log
90+
)
91+
bandwidths = defaultdict(list)
92+
total_nbytes = defaultdict(list)
93+
for k, L in incoming_logs.items():
94+
for d in L:
95+
if d["total"] >= args.ignore_size:
96+
bandwidths[k, d["who"]].append(d["bandwidth"])
97+
total_nbytes[k, d["who"]].append(d["total"])
98+
99+
bandwidths = {
100+
(scheduler_workers[w1].name, scheduler_workers[w2].name,): [
101+
"%s/s" % format_bytes(x) for x in np.quantile(v, [0.25, 0.50, 0.75])
102+
]
103+
for (w1, w2), v in bandwidths.items()
104+
}
105+
total_nbytes = {
106+
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(
107+
sum(nb)
108+
)
109+
for (w1, w2), nb in total_nbytes.items()
110+
}
111+
112+
print("Roundtrip benchmark")
113+
print("--------------------------")
114+
print(f"Size | {args.size}*{args.size}")
115+
print(f"Chunk-size | {args.chunk_size}")
116+
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
117+
print(f"Protocol | {args.protocol}")
118+
print(f"Device(s) | {args.devs}")
119+
print("==========================")
120+
print("Wall-clock | npartitions")
121+
print("--------------------------")
122+
for (took, npartitions) in took_list:
123+
t = format_time(took)
124+
t += " " * (11 - len(t))
125+
print(f"{t} | {npartitions}")
126+
print("==========================")
127+
print("(w1,w2) | 25% 50% 75% (total nbytes)")
128+
print("--------------------------")
129+
for (d1, d2), bw in sorted(bandwidths.items()):
130+
fmt = (
131+
"(%s,%s) | %s %s %s (%s)"
132+
if args.multi_node
133+
else "(%02d,%02d) | %s %s %s (%s)"
134+
)
135+
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
136+
137+
# An SSHCluster will not automatically shut down, we have to
138+
# ensure it does.
139+
if args.multi_node:
140+
await client.shutdown()
141+
142+
143+
def parse_args():
144+
special_args = [
145+
{
146+
"name": ["-s", "--size",],
147+
"default": "10000",
148+
"metavar": "n",
149+
"type": int,
150+
"help": "The size n in n^2 (default 10000)",
151+
},
152+
{
153+
"name": ["-t", "--type",],
154+
"choices": ["cpu", "gpu"],
155+
"default": "gpu",
156+
"type": str,
157+
"help": "Use GPU or CPU arrays",
158+
},
159+
{
160+
"name": ["-c", "--chunk-size",],
161+
"default": "128 MiB",
162+
"metavar": "nbytes",
163+
"type": str,
164+
"help": "Chunk size (default '128 MiB')",
165+
},
166+
{
167+
"name": ["-k", "--kernel-size",],
168+
"default": "1",
169+
"metavar": "k",
170+
"type": int,
171+
"help": "Kernel size, 2*k+1, in each dimension (default 1)",
172+
},
173+
{
174+
"name": "--ignore-size",
175+
"default": "1 MiB",
176+
"metavar": "nbytes",
177+
"type": parse_bytes,
178+
"help": "Ignore messages smaller than this (default '1 MB')",
179+
},
180+
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
181+
]
182+
183+
return parse_benchmark_args(
184+
description="Transpose on LocalCUDACluster benchmark", args_list=special_args
185+
)
186+
187+
188+
def main():
189+
args = parse_args()
190+
asyncio.get_event_loop().run_until_complete(run(args))
191+
192+
193+
if __name__ == "__main__":
194+
main()

0 commit comments

Comments
 (0)