From 3c0a12afa39662e0ed748f50b4f8899d7d250659 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 8 Mar 2021 15:12:16 -0800 Subject: [PATCH 01/11] Add multiprocess test to perform all-to-all ep creation --- tests/test_multiple_processes_all_to_all.py | 210 ++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 tests/test_multiple_processes_all_to_all.py diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py new file mode 100644 index 000000000..e3fc103b7 --- /dev/null +++ b/tests/test_multiple_processes_all_to_all.py @@ -0,0 +1,210 @@ +import asyncio +import multiprocessing +import random +import sys + +import numpy as np +import pytest + +import ucp + +OP_BYTES = 1 +PORT_BYTES = 2 + +OP_NONE = 0 +OP_WORKER_LISTENING = 1 +OP_WORKER_COMPLETED = 2 +OP_CLUSTER_READY = 3 +OP_SHUTDOWN = 4 + + +def generate_op_message(op, port): + op_bytes = op.to_bytes(OP_BYTES, sys.byteorder) + port_bytes = port.to_bytes(PORT_BYTES, sys.byteorder) + return bytearray(b"".join([op_bytes, port_bytes])) + + +def parse_op_message(msg): + op = int.from_bytes(msg[0:OP_BYTES], sys.byteorder) + port = int.from_bytes(msg[OP_BYTES : OP_BYTES + PORT_BYTES], sys.byteorder) + return {"op": op, "port": port} + + +def worker(my_port, monitor_port, all_ports): + ucp.init() + + global cluster_started + cluster_started = False + + async def _worker(my_port, all_ports): + def _register_cluster_started(): + global cluster_started + cluster_started = True + + async def _listener(ep): + op_msg = generate_op_message(OP_NONE, 0) + msg2send = np.arange(10) + msg2recv = np.empty_like(msg2send) + + msgs = [ep.recv(op_msg), ep.send(msg2send), ep.recv(msg2recv)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + op = parse_op_message(op_msg)["op"] + + if op == OP_SHUTDOWN: + await ep.close() + listener.close() + if op == OP_CLUSTER_READY: + _register_cluster_started() + + async def _client(port): + op_msg = generate_op_message(OP_NONE, 0) + msg2send = np.arange(10) + msg2recv = np.empty_like(msg2send) + + ep = await ucp.create_endpoint(ucp.get_address(), port) + msgs = [ep.send(op_msg), ep.recv(msg2send), ep.send(msg2recv)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + await asyncio.sleep(2) + + async def _signal_monitor(monitor_port, my_port, op): + op_msg = generate_op_message(op, my_port) + ack_msg = bytearray(2) + + ep = await ucp.create_endpoint(ucp.get_address(), monitor_port) + msgs = [ep.send(op_msg), ep.recv(ack_msg)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + # Start listener + listener = ucp.create_listener(_listener, port=my_port) + + # Signal monitor that worker is listening + await _signal_monitor(monitor_port, my_port, op=OP_WORKER_LISTENING) + + while not cluster_started: + await asyncio.sleep(0.1) + + # Create endpoints to all other workers + clients = [] + for port in all_ports: + clients.append(_client(port)) + await asyncio.gather(*clients, loop=asyncio.get_event_loop()) + + # Signal monitor that worker is completed + await _signal_monitor(monitor_port, my_port, op=OP_WORKER_COMPLETED) + + # Wait for a shutdown signal from monitor + try: + while not listener.closed(): + await asyncio.sleep(0.1) + except ucp.UCXCloseError: + pass + + asyncio.get_event_loop().run_until_complete(_worker(my_port, all_ports)) + + +def monitor(monitor_port, worker_ports): + ucp.init() + + listening_worker_ports = [] + completed_worker_ports = [] + + async def _monitor(monitor_port, worker_ports): + def _register(op, port): + if op == OP_WORKER_LISTENING: + listening_worker_ports.append(port) + elif op == OP_WORKER_COMPLETED: + completed_worker_ports.append(port) + + async def _listener(ep): + op_msg = generate_op_message(OP_NONE, 0) + ack_msg = bytearray(int(888).to_bytes(2, sys.byteorder)) + + # Sending an ack_msg prevents the other ep from closing too + # early, ultimately leading this process to hang. + msgs = [ep.recv(op_msg), ep.send(ack_msg)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + # worker_op == 0 for started, worker_op == 1 for completed + op_msg = parse_op_message(op_msg) + worker_op = op_msg["op"] + worker_port = op_msg["port"] + + _register(worker_op, worker_port) + + async def _send_op(op, port): + op_msg = generate_op_message(op, port) + msg2send = np.arange(10) + msg2recv = np.empty_like(msg2send) + + ep = await ucp.create_endpoint(ucp.get_address(), port) + msgs = [ep.send(op_msg), ep.send(msg2send), ep.recv(msg2recv)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + # Start monitor's listener + listener = ucp.create_listener(_listener, port=monitor_port) + + # Wait until all workers signal they are listening + while len(listening_worker_ports) != len(worker_ports): + await asyncio.sleep(0.1) + + # Send cluster ready message to all workers + ready_signals = [] + for port in listening_worker_ports: + ready_signals.append(_send_op(OP_CLUSTER_READY, port)) + await asyncio.gather(*ready_signals, loop=asyncio.get_event_loop()) + + # Wait until all workers signal completion + while len(completed_worker_ports) != len(worker_ports): + await asyncio.sleep(0.1) + + # Send shutdown message to all workers + close = [] + for port in completed_worker_ports: + close.append(_send_op(OP_SHUTDOWN, port)) + await asyncio.gather(*close, loop=asyncio.get_event_loop()) + + listener.close() + + asyncio.get_event_loop().run_until_complete(_monitor(monitor_port, worker_ports)) + + +@pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) +def test_send_recv_cu(num_workers): + # One additional port for monitor + num_ports = num_workers + 1 + + ports = set() + while len(ports) != num_ports: + missing_ports = num_ports - len(ports) + ports = ports.union( + [random.randint(13000, 23000) for n in range(missing_ports)] + ) + ports = list(ports) + + monitor_port = ports[0] + worker_ports = ports[1:] + + ctx = multiprocessing.get_context("spawn") + + monitor_process = ctx.Process( + name="monitor", target=monitor, args=[monitor_port, worker_ports] + ) + monitor_process.start() + + worker_processes = [] + for port in worker_ports: + worker_process = ctx.Process( + name="worker", target=worker, args=[port, monitor_port, worker_ports] + ) + worker_process.start() + worker_processes.append(worker_process) + + for worker_process in worker_processes: + worker_process.join() + + monitor_process.join() + + assert worker_process.exitcode == 0 + assert monitor_process.exitcode == 0 From 35664da55fb0d26c1381425f31f9bdaf6fa1bb4b Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 10 Mar 2021 07:52:42 -0800 Subject: [PATCH 02/11] Add support for persistent endpoints in multiprocess all_to_all test --- tests/test_multiple_processes_all_to_all.py | 152 ++++++++++++++++---- 1 file changed, 128 insertions(+), 24 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index e3fc103b7..e5618fb13 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -17,6 +17,8 @@ OP_CLUSTER_READY = 3 OP_SHUTDOWN = 4 +PersistentEndpoints = False + def generate_op_message(op, port): op_bytes = op.to_bytes(OP_BYTES, sys.byteorder) @@ -30,9 +32,28 @@ def parse_op_message(msg): return {"op": op, "port": port} +async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): + while True: + try: + ep = await ucp.create_endpoint(ucp.get_address(), remote_port) + return ep + except ucp.exceptions.UCXCanceled as e: + print( + "%s[%d]->%s[%d] Failed: %s" + % (my_task, my_port, remote_task, remote_port, e), + flush=True, + ) + await asyncio.sleep(0.1) + + def worker(my_port, monitor_port, all_ports): ucp.init() + listener_eps = [] + + global listener_monitor_ep + listener_monitor_ep = None + global cluster_started cluster_started = False @@ -41,7 +62,11 @@ def _register_cluster_started(): global cluster_started cluster_started = True - async def _listener(ep): + async def _close_endpoints(): + for ep in listener_eps: + await ep.close() + + async def _listener(ep, cache_ep=False): op_msg = generate_op_message(OP_NONE, 0) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) @@ -51,48 +76,97 @@ async def _listener(ep): op = parse_op_message(op_msg)["op"] + if cache_ep and PersistentEndpoints: + if op == OP_NONE: + listener_eps.append(ep) + else: + global listener_monitor_ep + listener_monitor_ep = ep + if op == OP_SHUTDOWN: await ep.close() listener.close() if op == OP_CLUSTER_READY: _register_cluster_started() - async def _client(port): + async def _listener_cb(ep): + await _listener(ep, cache_ep=True) + + async def _client(port, ep=None): op_msg = generate_op_message(OP_NONE, 0) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) - ep = await ucp.create_endpoint(ucp.get_address(), port) - msgs = [ep.send(op_msg), ep.recv(msg2send), ep.send(msg2recv)] + if ep is None: + ep = await create_endpoint_retry(my_port, port, "Worker", "Worker") + msgs = [ep.send(op_msg), ep.recv(msg2recv), ep.send(msg2send)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - await asyncio.sleep(2) - - async def _signal_monitor(monitor_port, my_port, op): + async def _signal_monitor(monitor_port, my_port, op, ep=None): op_msg = generate_op_message(op, my_port) ack_msg = bytearray(2) - ep = await ucp.create_endpoint(ucp.get_address(), monitor_port) + if ep is None: + #ep = await ucp.create_endpoint(ucp.get_address(), monitor_port) + ep = await create_endpoint_retry(my_port, monitor_port, "Worker", "Monitor") msgs = [ep.send(op_msg), ep.recv(ack_msg)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) # Start listener - listener = ucp.create_listener(_listener, port=my_port) + listener = ucp.create_listener(_listener_cb, port=my_port) # Signal monitor that worker is listening - await _signal_monitor(monitor_port, my_port, op=OP_WORKER_LISTENING) + monitor_ep = None + if PersistentEndpoints: + monitor_ep = await create_endpoint_retry( + my_port, monitor_port, "Worker", "Monitor" + ) + await _signal_monitor( + monitor_port, my_port, op=OP_WORKER_LISTENING, ep=monitor_ep + ) + else: + await _signal_monitor(monitor_port, my_port, op=OP_WORKER_LISTENING) while not cluster_started: await asyncio.sleep(0.1) - # Create endpoints to all other workers - clients = [] - for port in all_ports: - clients.append(_client(port)) - await asyncio.gather(*clients, loop=asyncio.get_event_loop()) + eps = [] + if PersistentEndpoints: + client_tasks = [] + # Create endpoints to all other workers + for remote_port in all_ports: + if remote_port == my_port: + continue + ep = await create_endpoint_retry( + my_port, remote_port, "Worker", "Worker" + ) + eps.append(ep) + client_tasks.append(_client(remote_port, ep)) + await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) + + # Wait until listener_eps have all been cached + while len(listener_eps) != len(all_ports) - 1: + await asyncio.sleep(0.1) + else: + # Create endpoints to all other workers + client_tasks = [] + for port in all_ports: + if port == my_port: + continue + client_tasks.append(_client(port)) + await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) # Signal monitor that worker is completed - await _signal_monitor(monitor_port, my_port, op=OP_WORKER_COMPLETED) + if PersistentEndpoints: + await _signal_monitor( + monitor_port, my_port, op=OP_WORKER_COMPLETED, ep=monitor_ep + ) + else: + await _signal_monitor(monitor_port, my_port, op=OP_WORKER_COMPLETED) + + # Wait for closing signal + if PersistentEndpoints: + await _listener(listener_monitor_ep) # Wait for a shutdown signal from monitor try: @@ -107,6 +181,7 @@ async def _signal_monitor(monitor_port, my_port, op): def monitor(monitor_port, worker_ports): ucp.init() + listener_eps = [] listening_worker_ports = [] completed_worker_ports = [] @@ -117,7 +192,10 @@ def _register(op, port): elif op == OP_WORKER_COMPLETED: completed_worker_ports.append(port) - async def _listener(ep): + async def _listener(ep, cache_ep=True): + if cache_ep and PersistentEndpoints: + listener_eps.append(ep) + op_msg = generate_op_message(OP_NONE, 0) ack_msg = bytearray(int(888).to_bytes(2, sys.byteorder)) @@ -126,35 +204,58 @@ async def _listener(ep): msgs = [ep.recv(op_msg), ep.send(ack_msg)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - # worker_op == 0 for started, worker_op == 1 for completed op_msg = parse_op_message(op_msg) worker_op = op_msg["op"] worker_port = op_msg["port"] _register(worker_op, worker_port) - async def _send_op(op, port): + async def _listener_cb(ep): + await _listener(ep, cache_ep=True) + + async def _send_op(op, port, ep=None): op_msg = generate_op_message(op, port) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) - ep = await ucp.create_endpoint(ucp.get_address(), port) + if ep is None: + ep = await create_endpoint_retry(monitor_port, port, "Monitor", "Monitor") msgs = [ep.send(op_msg), ep.send(msg2send), ep.recv(msg2recv)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) # Start monitor's listener - listener = ucp.create_listener(_listener, port=monitor_port) + listener = ucp.create_listener(_listener_cb, port=monitor_port) # Wait until all workers signal they are listening while len(listening_worker_ports) != len(worker_ports): await asyncio.sleep(0.1) - # Send cluster ready message to all workers + # Create persistent endpoints to all workers + worker_eps = {} + if PersistentEndpoints: + for remote_port in worker_ports: + worker_eps[remote_port] = await create_endpoint_retry( + monitor_port, remote_port, "Monitor", "Worker" + ) + + # Send shutdown message to all workers ready_signals = [] for port in listening_worker_ports: - ready_signals.append(_send_op(OP_CLUSTER_READY, port)) + if PersistentEndpoints: + ready_signals.append(_send_op(OP_CLUSTER_READY, port, worker_eps[port])) + else: + ready_signals.append(_send_op(OP_CLUSTER_READY, port)) await asyncio.gather(*ready_signals, loop=asyncio.get_event_loop()) + # When using persistent endpoints, we need to wait on previously + # created endpoints for completion signal + if PersistentEndpoints: + listener_tasks = [] + for listener_ep in listener_eps: + listener_tasks.append(_listener(listener_ep)) + + await asyncio.gather(*listener_tasks, loop=asyncio.get_event_loop()) + # Wait until all workers signal completion while len(completed_worker_ports) != len(worker_ports): await asyncio.sleep(0.1) @@ -162,7 +263,10 @@ async def _send_op(op, port): # Send shutdown message to all workers close = [] for port in completed_worker_ports: - close.append(_send_op(OP_SHUTDOWN, port)) + if PersistentEndpoints: + close.append(_send_op(OP_SHUTDOWN, port, ep=worker_eps[port])) + else: + close.append(_send_op(OP_SHUTDOWN, port)) await asyncio.gather(*close, loop=asyncio.get_event_loop()) listener.close() From 65f1870c8ba5cf32402a816d371b8260f78f6570 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 10 Mar 2021 09:42:31 -0800 Subject: [PATCH 03/11] Improve closing of endpoints in multiproc all_to_all test --- tests/test_multiple_processes_all_to_all.py | 44 ++++++++++++--------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index e5618fb13..f352f9383 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -1,5 +1,6 @@ import asyncio import multiprocessing +import os import random import sys @@ -17,7 +18,7 @@ OP_CLUSTER_READY = 3 OP_SHUTDOWN = 4 -PersistentEndpoints = False +PersistentEndpoints = True def generate_op_message(op, port): @@ -49,6 +50,7 @@ async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): def worker(my_port, monitor_port, all_ports): ucp.init() + eps = [] listener_eps = [] global listener_monitor_ep @@ -62,11 +64,9 @@ def _register_cluster_started(): global cluster_started cluster_started = True - async def _close_endpoints(): - for ep in listener_eps: - await ep.close() - async def _listener(ep, cache_ep=False): + global listener_monitor_ep + op_msg = generate_op_message(OP_NONE, 0) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) @@ -76,19 +76,20 @@ async def _listener(ep, cache_ep=False): op = parse_op_message(op_msg)["op"] + if op == OP_SHUTDOWN: + while not listener_monitor_ep.closed(): + await asyncio.sleep(0.1) + listener.close() + return + if op == OP_CLUSTER_READY: + _register_cluster_started() + if cache_ep and PersistentEndpoints: if op == OP_NONE: listener_eps.append(ep) else: - global listener_monitor_ep listener_monitor_ep = ep - if op == OP_SHUTDOWN: - await ep.close() - listener.close() - if op == OP_CLUSTER_READY: - _register_cluster_started() - async def _listener_cb(ep): await _listener(ep, cache_ep=True) @@ -107,8 +108,9 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): ack_msg = bytearray(2) if ep is None: - #ep = await ucp.create_endpoint(ucp.get_address(), monitor_port) - ep = await create_endpoint_retry(my_port, monitor_port, "Worker", "Monitor") + ep = await create_endpoint_retry( + my_port, monitor_port, "Worker", "Monitor" + ) msgs = [ep.send(op_msg), ep.recv(ack_msg)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) @@ -130,7 +132,6 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): while not cluster_started: await asyncio.sleep(0.1) - eps = [] if PersistentEndpoints: client_tasks = [] # Create endpoints to all other workers @@ -181,7 +182,7 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): def monitor(monitor_port, worker_ports): ucp.init() - listener_eps = [] + listener_eps = {} listening_worker_ports = [] completed_worker_ports = [] @@ -194,7 +195,7 @@ def _register(op, port): async def _listener(ep, cache_ep=True): if cache_ep and PersistentEndpoints: - listener_eps.append(ep) + listener_eps[ep.uid] = ep op_msg = generate_op_message(OP_NONE, 0) ack_msg = bytearray(int(888).to_bytes(2, sys.byteorder)) @@ -219,10 +220,15 @@ async def _send_op(op, port, ep=None): msg2recv = np.empty_like(msg2send) if ep is None: - ep = await create_endpoint_retry(monitor_port, port, "Monitor", "Monitor") + ep = await create_endpoint_retry( + monitor_port, port, "Monitor", "Monitor" + ) msgs = [ep.send(op_msg), ep.send(msg2send), ep.recv(msg2recv)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + if op == OP_SHUTDOWN: + await ep.close() + # Start monitor's listener listener = ucp.create_listener(_listener_cb, port=monitor_port) @@ -251,7 +257,7 @@ async def _send_op(op, port, ep=None): # created endpoints for completion signal if PersistentEndpoints: listener_tasks = [] - for listener_ep in listener_eps: + for listener_ep in listener_eps.values(): listener_tasks.append(_listener(listener_ep)) await asyncio.gather(*listener_tasks, loop=asyncio.get_event_loop()) From ba66c92498f5866de03d65eda33db51cbc3def75 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 10 Mar 2021 10:11:54 -0800 Subject: [PATCH 04/11] Do more transfers between worker pairs --- tests/test_multiple_processes_all_to_all.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index f352f9383..a90684337 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -148,6 +148,18 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): # Wait until listener_eps have all been cached while len(listener_eps) != len(all_ports) - 1: await asyncio.sleep(0.1) + + # Exchange messages with other workers + for i in range(3): + client_tasks = [] + listener_tasks = [] + for ep in eps: + client_tasks.append(_client(remote_port, ep)) + for listener_ep in listener_eps: + listener_tasks.append(_listener(listener_ep)) + + all_tasks = client_tasks + listener_tasks + await asyncio.gather(*all_tasks, loop=asyncio.get_event_loop()) else: # Create endpoints to all other workers client_tasks = [] From b712ce9d99b6a2c7e726be47b5e62d797c5d4ba9 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 10 Mar 2021 14:54:50 -0800 Subject: [PATCH 05/11] Support multiple endpoints per worker in multiproc all_to_all test --- tests/test_multiple_processes_all_to_all.py | 34 ++++++++++++--------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index a90684337..5194d5d3e 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -47,7 +47,7 @@ async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): await asyncio.sleep(0.1) -def worker(my_port, monitor_port, all_ports): +def worker(my_port, monitor_port, all_ports, endpoints_per_worker): ucp.init() eps = [] @@ -133,20 +133,21 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): await asyncio.sleep(0.1) if PersistentEndpoints: - client_tasks = [] - # Create endpoints to all other workers - for remote_port in all_ports: - if remote_port == my_port: - continue - ep = await create_endpoint_retry( - my_port, remote_port, "Worker", "Worker" - ) - eps.append(ep) - client_tasks.append(_client(remote_port, ep)) - await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) + for i in range(endpoints_per_worker): + client_tasks = [] + # Create endpoints to all other workers + for remote_port in all_ports: + if remote_port == my_port: + continue + ep = await create_endpoint_retry( + my_port, remote_port, "Worker", "Worker" + ) + eps.append(ep) + client_tasks.append(_client(remote_port, ep)) + await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) # Wait until listener_eps have all been cached - while len(listener_eps) != len(all_ports) - 1: + while len(listener_eps) != endpoints_per_worker * (len(all_ports) - 1): await asyncio.sleep(0.1) # Exchange messages with other workers @@ -293,7 +294,8 @@ async def _send_op(op, port, ep=None): @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) -def test_send_recv_cu(num_workers): +@pytest.mark.parametrize("endpoints_per_worker", [20, 80, 320, 640]) +def test_send_recv_cu(num_workers, endpoints_per_worker): # One additional port for monitor num_ports = num_workers + 1 @@ -318,7 +320,9 @@ def test_send_recv_cu(num_workers): worker_processes = [] for port in worker_ports: worker_process = ctx.Process( - name="worker", target=worker, args=[port, monitor_port, worker_ports] + name="worker", + target=worker, + args=[port, monitor_port, worker_ports, endpoints_per_worker], ) worker_process.start() worker_processes.append(worker_process) From aa067315737b4df91d89e2571c2ea15262f39154 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 9 Jun 2021 15:54:17 -0700 Subject: [PATCH 06/11] Remove monitor process in favor of multiprocessing shared memory --- tests/test_multiple_processes_all_to_all.py | 270 +++----------------- 1 file changed, 42 insertions(+), 228 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index 5194d5d3e..922197229 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -1,38 +1,14 @@ import asyncio import multiprocessing -import os -import random -import sys import numpy as np import pytest import ucp -OP_BYTES = 1 -PORT_BYTES = 2 - -OP_NONE = 0 -OP_WORKER_LISTENING = 1 -OP_WORKER_COMPLETED = 2 -OP_CLUSTER_READY = 3 -OP_SHUTDOWN = 4 - PersistentEndpoints = True -def generate_op_message(op, port): - op_bytes = op.to_bytes(OP_BYTES, sys.byteorder) - port_bytes = port.to_bytes(PORT_BYTES, sys.byteorder) - return bytearray(b"".join([op_bytes, port_bytes])) - - -def parse_op_message(msg): - op = int.from_bytes(msg[0:OP_BYTES], sys.byteorder) - port = int.from_bytes(msg[OP_BYTES : OP_BYTES + PORT_BYTES], sys.byteorder) - return {"op": op, "port": port} - - async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): while True: try: @@ -47,107 +23,66 @@ async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): await asyncio.sleep(0.1) -def worker(my_port, monitor_port, all_ports, endpoints_per_worker): +def worker(signal, ports, lock, worker_num, num_workers, endpoints_per_worker): ucp.init() eps = [] - listener_eps = [] - - global listener_monitor_ep - listener_monitor_ep = None + listener_eps = set() global cluster_started cluster_started = False - async def _worker(my_port, all_ports): + async def _worker(): def _register_cluster_started(): global cluster_started cluster_started = True async def _listener(ep, cache_ep=False): - global listener_monitor_ep - - op_msg = generate_op_message(OP_NONE, 0) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) - msgs = [ep.recv(op_msg), ep.send(msg2send), ep.recv(msg2recv)] + msgs = [ep.send(msg2send), ep.recv(msg2recv)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - op = parse_op_message(op_msg)["op"] - - if op == OP_SHUTDOWN: - while not listener_monitor_ep.closed(): - await asyncio.sleep(0.1) - listener.close() - return - if op == OP_CLUSTER_READY: - _register_cluster_started() - - if cache_ep and PersistentEndpoints: - if op == OP_NONE: - listener_eps.append(ep) - else: - listener_monitor_ep = ep - async def _listener_cb(ep): + if PersistentEndpoints: + listener_eps.add(ep) await _listener(ep, cache_ep=True) - async def _client(port, ep=None): - op_msg = generate_op_message(OP_NONE, 0) + async def _client(my_port, remote_port, ep=None): msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) if ep is None: ep = await create_endpoint_retry(my_port, port, "Worker", "Worker") - msgs = [ep.send(op_msg), ep.recv(msg2recv), ep.send(msg2send)] - await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - - async def _signal_monitor(monitor_port, my_port, op, ep=None): - op_msg = generate_op_message(op, my_port) - ack_msg = bytearray(2) - - if ep is None: - ep = await create_endpoint_retry( - my_port, monitor_port, "Worker", "Monitor" - ) - msgs = [ep.send(op_msg), ep.recv(ack_msg)] + msgs = [ep.recv(msg2recv), ep.send(msg2send)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) # Start listener - listener = ucp.create_listener(_listener_cb, port=my_port) + listener = ucp.create_listener(_listener_cb) + with lock: + signal[0] += 1 + ports[worker_num] = listener.port - # Signal monitor that worker is listening - monitor_ep = None - if PersistentEndpoints: - monitor_ep = await create_endpoint_retry( - my_port, monitor_port, "Worker", "Monitor" - ) - await _signal_monitor( - monitor_port, my_port, op=OP_WORKER_LISTENING, ep=monitor_ep - ) - else: - await _signal_monitor(monitor_port, my_port, op=OP_WORKER_LISTENING) - - while not cluster_started: - await asyncio.sleep(0.1) + while signal[0] != num_workers: + pass if PersistentEndpoints: for i in range(endpoints_per_worker): client_tasks = [] # Create endpoints to all other workers - for remote_port in all_ports: - if remote_port == my_port: + for remote_port in list(ports): + if remote_port == listener.port: continue ep = await create_endpoint_retry( - my_port, remote_port, "Worker", "Worker" + listener.port, remote_port, "Worker", "Worker" ) eps.append(ep) - client_tasks.append(_client(remote_port, ep)) + client_tasks.append(_client(listener.port, remote_port, ep)) await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) # Wait until listener_eps have all been cached - while len(listener_eps) != endpoints_per_worker * (len(all_ports) - 1): + while len(listener_eps) != endpoints_per_worker * (num_workers - 1): await asyncio.sleep(0.1) # Exchange messages with other workers @@ -155,32 +90,30 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): client_tasks = [] listener_tasks = [] for ep in eps: - client_tasks.append(_client(remote_port, ep)) + client_tasks.append(_client(listener.port, remote_port, ep)) for listener_ep in listener_eps: listener_tasks.append(_listener(listener_ep)) all_tasks = client_tasks + listener_tasks await asyncio.gather(*all_tasks, loop=asyncio.get_event_loop()) else: - # Create endpoints to all other workers - client_tasks = [] - for port in all_ports: - if port == my_port: - continue - client_tasks.append(_client(port)) - await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) - - # Signal monitor that worker is completed - if PersistentEndpoints: - await _signal_monitor( - monitor_port, my_port, op=OP_WORKER_COMPLETED, ep=monitor_ep - ) - else: - await _signal_monitor(monitor_port, my_port, op=OP_WORKER_COMPLETED) + for i in range(3): + # Create endpoints to all other workers + client_tasks = [] + for port in list(ports): + if port == listener.port: + continue + client_tasks.append(_client(listener.port, port)) + await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) - # Wait for closing signal - if PersistentEndpoints: - await _listener(listener_monitor_ep) + with lock: + signal[1] += 1 + ports[worker_num] = listener.port + + while signal[1] != num_workers: + pass + + listener.close() # Wait for a shutdown signal from monitor try: @@ -189,140 +122,24 @@ async def _signal_monitor(monitor_port, my_port, op, ep=None): except ucp.UCXCloseError: pass - asyncio.get_event_loop().run_until_complete(_worker(my_port, all_ports)) - - -def monitor(monitor_port, worker_ports): - ucp.init() - - listener_eps = {} - listening_worker_ports = [] - completed_worker_ports = [] - - async def _monitor(monitor_port, worker_ports): - def _register(op, port): - if op == OP_WORKER_LISTENING: - listening_worker_ports.append(port) - elif op == OP_WORKER_COMPLETED: - completed_worker_ports.append(port) - - async def _listener(ep, cache_ep=True): - if cache_ep and PersistentEndpoints: - listener_eps[ep.uid] = ep - - op_msg = generate_op_message(OP_NONE, 0) - ack_msg = bytearray(int(888).to_bytes(2, sys.byteorder)) - - # Sending an ack_msg prevents the other ep from closing too - # early, ultimately leading this process to hang. - msgs = [ep.recv(op_msg), ep.send(ack_msg)] - await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - - op_msg = parse_op_message(op_msg) - worker_op = op_msg["op"] - worker_port = op_msg["port"] - - _register(worker_op, worker_port) - - async def _listener_cb(ep): - await _listener(ep, cache_ep=True) - - async def _send_op(op, port, ep=None): - op_msg = generate_op_message(op, port) - msg2send = np.arange(10) - msg2recv = np.empty_like(msg2send) - - if ep is None: - ep = await create_endpoint_retry( - monitor_port, port, "Monitor", "Monitor" - ) - msgs = [ep.send(op_msg), ep.send(msg2send), ep.recv(msg2recv)] - await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - - if op == OP_SHUTDOWN: - await ep.close() - - # Start monitor's listener - listener = ucp.create_listener(_listener_cb, port=monitor_port) - - # Wait until all workers signal they are listening - while len(listening_worker_ports) != len(worker_ports): - await asyncio.sleep(0.1) - - # Create persistent endpoints to all workers - worker_eps = {} - if PersistentEndpoints: - for remote_port in worker_ports: - worker_eps[remote_port] = await create_endpoint_retry( - monitor_port, remote_port, "Monitor", "Worker" - ) - - # Send shutdown message to all workers - ready_signals = [] - for port in listening_worker_ports: - if PersistentEndpoints: - ready_signals.append(_send_op(OP_CLUSTER_READY, port, worker_eps[port])) - else: - ready_signals.append(_send_op(OP_CLUSTER_READY, port)) - await asyncio.gather(*ready_signals, loop=asyncio.get_event_loop()) - - # When using persistent endpoints, we need to wait on previously - # created endpoints for completion signal - if PersistentEndpoints: - listener_tasks = [] - for listener_ep in listener_eps.values(): - listener_tasks.append(_listener(listener_ep)) - - await asyncio.gather(*listener_tasks, loop=asyncio.get_event_loop()) - - # Wait until all workers signal completion - while len(completed_worker_ports) != len(worker_ports): - await asyncio.sleep(0.1) - - # Send shutdown message to all workers - close = [] - for port in completed_worker_ports: - if PersistentEndpoints: - close.append(_send_op(OP_SHUTDOWN, port, ep=worker_eps[port])) - else: - close.append(_send_op(OP_SHUTDOWN, port)) - await asyncio.gather(*close, loop=asyncio.get_event_loop()) - - listener.close() - - asyncio.get_event_loop().run_until_complete(_monitor(monitor_port, worker_ports)) + asyncio.get_event_loop().run_until_complete(_worker()) @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) @pytest.mark.parametrize("endpoints_per_worker", [20, 80, 320, 640]) def test_send_recv_cu(num_workers, endpoints_per_worker): - # One additional port for monitor - num_ports = num_workers + 1 - - ports = set() - while len(ports) != num_ports: - missing_ports = num_ports - len(ports) - ports = ports.union( - [random.randint(13000, 23000) for n in range(missing_ports)] - ) - ports = list(ports) - - monitor_port = ports[0] - worker_ports = ports[1:] - ctx = multiprocessing.get_context("spawn") - monitor_process = ctx.Process( - name="monitor", target=monitor, args=[monitor_port, worker_ports] - ) - monitor_process.start() + signal = ctx.Array("i", [0, 0]) + ports = ctx.Array("i", range(num_workers)) + lock = ctx.Lock() worker_processes = [] - for port in worker_ports: + for worker_num in range(num_workers): worker_process = ctx.Process( name="worker", target=worker, - args=[port, monitor_port, worker_ports, endpoints_per_worker], + args=[signal, ports, lock, worker_num, num_workers, endpoints_per_worker], ) worker_process.start() worker_processes.append(worker_process) @@ -330,7 +147,4 @@ def test_send_recv_cu(num_workers, endpoints_per_worker): for worker_process in worker_processes: worker_process.join() - monitor_process.join() - assert worker_process.exitcode == 0 - assert monitor_process.exitcode == 0 From 68c07a6d173c850c83e1505e0808400d5224ff1d Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 1 Jul 2021 11:39:43 -0700 Subject: [PATCH 07/11] Mark some multiple processes all-to-all tests as slow --- tests/test_multiple_processes_all_to_all.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index 922197229..f2eb45334 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -125,9 +125,7 @@ async def _client(my_port, remote_port, ep=None): asyncio.get_event_loop().run_until_complete(_worker()) -@pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) -@pytest.mark.parametrize("endpoints_per_worker", [20, 80, 320, 640]) -def test_send_recv_cu(num_workers, endpoints_per_worker): +def _test_send_recv_cu(num_workers, endpoints_per_worker): ctx = multiprocessing.get_context("spawn") signal = ctx.Array("i", [0, 0]) @@ -148,3 +146,16 @@ def test_send_recv_cu(num_workers, endpoints_per_worker): worker_process.join() assert worker_process.exitcode == 0 + + +@pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) +@pytest.mark.parametrize("endpoints_per_worker", [20, 80]) +def test_send_recv_cu(num_workers, endpoints_per_worker): + _test_send_recv_cu(num_workers, endpoints_per_worker) + + +@pytest.mark.slow +@pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) +@pytest.mark.parametrize("endpoints_per_worker", [320, 640]) +def test_send_recv_cu_slow(num_workers, endpoints_per_worker): + _test_send_recv_cu(num_workers, endpoints_per_worker) From c91ec3cc4ab5807979764c12fb0b3ab6dff05d4c Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 1 Jul 2021 14:00:05 -0700 Subject: [PATCH 08/11] Store remote EP port in dictionary --- tests/test_multiple_processes_all_to_all.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index f2eb45334..b69b34e7c 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -26,7 +26,7 @@ async def create_endpoint_retry(my_port, remote_port, my_task, remote_task): def worker(signal, ports, lock, worker_num, num_workers, endpoints_per_worker): ucp.init() - eps = [] + eps = dict() listener_eps = set() global cluster_started @@ -77,7 +77,7 @@ async def _client(my_port, remote_port, ep=None): ep = await create_endpoint_retry( listener.port, remote_port, "Worker", "Worker" ) - eps.append(ep) + eps[(remote_port, i)] = ep client_tasks.append(_client(listener.port, remote_port, ep)) await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) @@ -89,7 +89,7 @@ async def _client(my_port, remote_port, ep=None): for i in range(3): client_tasks = [] listener_tasks = [] - for ep in eps: + for (remote_port, _), ep in eps.items(): client_tasks.append(_client(listener.port, remote_port, ep)) for listener_ep in listener_eps: listener_tasks.append(_listener(listener_ep)) From 33b630df3d04964709ab28e8d5ca390e9e026c70 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 1 Jul 2021 14:00:42 -0700 Subject: [PATCH 09/11] Mark more tests as slow --- tests/test_multiple_processes_all_to_all.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index b69b34e7c..2bf5e1ef6 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -149,13 +149,13 @@ def _test_send_recv_cu(num_workers, endpoints_per_worker): @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) -@pytest.mark.parametrize("endpoints_per_worker", [20, 80]) +@pytest.mark.parametrize("endpoints_per_worker", [20, 40]) def test_send_recv_cu(num_workers, endpoints_per_worker): _test_send_recv_cu(num_workers, endpoints_per_worker) @pytest.mark.slow @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) -@pytest.mark.parametrize("endpoints_per_worker", [320, 640]) +@pytest.mark.parametrize("endpoints_per_worker", [80, 320, 640]) def test_send_recv_cu_slow(num_workers, endpoints_per_worker): _test_send_recv_cu(num_workers, endpoints_per_worker) From b5f0127e10fbab2f8671156cdd1e5c1d4c066a24 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 6 Jul 2021 03:47:28 -0700 Subject: [PATCH 10/11] Limit all-to-all fast test to 20 endpoints per worker --- tests/test_multiple_processes_all_to_all.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index 2bf5e1ef6..1e970a0e1 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -149,7 +149,7 @@ def _test_send_recv_cu(num_workers, endpoints_per_worker): @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) -@pytest.mark.parametrize("endpoints_per_worker", [20, 40]) +@pytest.mark.parametrize("endpoints_per_worker", [20]) def test_send_recv_cu(num_workers, endpoints_per_worker): _test_send_recv_cu(num_workers, endpoints_per_worker) From 137d25f103b585a0d8e9a7b9aab5dca184b4ec30 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 7 Jul 2021 12:46:52 -0700 Subject: [PATCH 11/11] Slightly simplify all-to-all test code --- tests/test_multiple_processes_all_to_all.py | 34 ++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/test_multiple_processes_all_to_all.py b/tests/test_multiple_processes_all_to_all.py index 1e970a0e1..e65be2332 100644 --- a/tests/test_multiple_processes_all_to_all.py +++ b/tests/test_multiple_processes_all_to_all.py @@ -37,26 +37,26 @@ def _register_cluster_started(): global cluster_started cluster_started = True - async def _listener(ep, cache_ep=False): + async def _transfer(ep): msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) - msgs = [ep.send(msg2send), ep.recv(msg2recv)] - await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + msgs = [ep.recv(msg2recv), ep.send(msg2send)] + await asyncio.gather(*msgs) + + async def _listener(ep): + await _transfer(ep) async def _listener_cb(ep): if PersistentEndpoints: listener_eps.add(ep) - await _listener(ep, cache_ep=True) + await _listener(ep) async def _client(my_port, remote_port, ep=None): - msg2send = np.arange(10) - msg2recv = np.empty_like(msg2send) - if ep is None: ep = await create_endpoint_retry(my_port, port, "Worker", "Worker") - msgs = [ep.recv(msg2recv), ep.send(msg2send)] - await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + await _transfer(ep) # Start listener listener = ucp.create_listener(_listener_cb) @@ -79,7 +79,7 @@ async def _client(my_port, remote_port, ep=None): ) eps[(remote_port, i)] = ep client_tasks.append(_client(listener.port, remote_port, ep)) - await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) + await asyncio.gather(*client_tasks) # Wait until listener_eps have all been cached while len(listener_eps) != endpoints_per_worker * (num_workers - 1): @@ -95,7 +95,7 @@ async def _client(my_port, remote_port, ep=None): listener_tasks.append(_listener(listener_ep)) all_tasks = client_tasks + listener_tasks - await asyncio.gather(*all_tasks, loop=asyncio.get_event_loop()) + await asyncio.gather(*all_tasks) else: for i in range(3): # Create endpoints to all other workers @@ -104,7 +104,7 @@ async def _client(my_port, remote_port, ep=None): if port == listener.port: continue client_tasks.append(_client(listener.port, port)) - await asyncio.gather(*client_tasks, loop=asyncio.get_event_loop()) + await asyncio.gather(*client_tasks) with lock: signal[1] += 1 @@ -125,7 +125,7 @@ async def _client(my_port, remote_port, ep=None): asyncio.get_event_loop().run_until_complete(_worker()) -def _test_send_recv_cu(num_workers, endpoints_per_worker): +def _test_multiple_processes_all_to_all(num_workers, endpoints_per_worker): ctx = multiprocessing.get_context("spawn") signal = ctx.Array("i", [0, 0]) @@ -150,12 +150,12 @@ def _test_send_recv_cu(num_workers, endpoints_per_worker): @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) @pytest.mark.parametrize("endpoints_per_worker", [20]) -def test_send_recv_cu(num_workers, endpoints_per_worker): - _test_send_recv_cu(num_workers, endpoints_per_worker) +def test_multiple_processes_all_to_all(num_workers, endpoints_per_worker): + _test_multiple_processes_all_to_all(num_workers, endpoints_per_worker) @pytest.mark.slow @pytest.mark.parametrize("num_workers", [1, 2, 4, 8]) @pytest.mark.parametrize("endpoints_per_worker", [80, 320, 640]) -def test_send_recv_cu_slow(num_workers, endpoints_per_worker): - _test_send_recv_cu(num_workers, endpoints_per_worker) +def test_multiple_processes_all_to_all_slow(num_workers, endpoints_per_worker): + _test_multiple_processes_all_to_all(num_workers, endpoints_per_worker)