From 429fd33569b3966d9a83a530e2149ef8be36ffe0 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Thu, 4 Mar 2021 13:53:49 -0800 Subject: [PATCH 1/7] Rename test_multiple_nodes to test_single_process --- tests/{test_multiple_nodes.py => test_single_process.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_multiple_nodes.py => test_single_process.py} (100%) diff --git a/tests/test_multiple_nodes.py b/tests/test_single_process.py similarity index 100% rename from tests/test_multiple_nodes.py rename to tests/test_single_process.py From 627466d18f43200aee867bde0dfc32696b0dae96 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Thu, 4 Mar 2021 13:54:10 -0800 Subject: [PATCH 2/7] Adjust number of clients in test_single_process --- tests/test_single_process.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/tests/test_single_process.py b/tests/test_single_process.py index 5cb7e875f..e512c6e9c 100644 --- a/tests/test_single_process.py +++ b/tests/test_single_process.py @@ -29,33 +29,22 @@ async def client_node(port): @pytest.mark.asyncio -async def test_multiple_nodes(): - lf1 = ucp.create_listener(server_node) - lf2 = ucp.create_listener(server_node) - assert lf1.port != lf2.port - - nodes = [] - for _ in range(10): - nodes.append(client_node(lf1.port)) - nodes.append(client_node(lf2.port)) - await asyncio.gather(*nodes, loop=asyncio.get_event_loop()) - - -@pytest.mark.asyncio -async def test_one_server_many_clients(): +async def test_one_listener_many_clients(): lf = ucp.create_listener(server_node) clients = [] - for _ in range(100): + for _ in range(50): clients.append(client_node(lf.port)) await asyncio.gather(*clients, loop=asyncio.get_event_loop()) @pytest.mark.asyncio -async def test_two_servers_many_clients(): +async def test_two_listeners_many_clients(): lf1 = ucp.create_listener(server_node) lf2 = ucp.create_listener(server_node) + assert lf1.port != lf2.port + clients = [] - for _ in range(100): + for _ in range(25): clients.append(client_node(lf1.port)) clients.append(client_node(lf2.port)) await asyncio.gather(*clients, loop=asyncio.get_event_loop()) From 41151431134ab887be0119a32b70c5cc64e51afe Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Fri, 5 Mar 2021 15:29:32 -0800 Subject: [PATCH 3/7] Add multiprocess test for many listeners/many clients --- tests/test_multiple_processes.py | 82 ++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 tests/test_multiple_processes.py diff --git a/tests/test_multiple_processes.py b/tests/test_multiple_processes.py new file mode 100644 index 000000000..eca11f870 --- /dev/null +++ b/tests/test_multiple_processes.py @@ -0,0 +1,82 @@ +import asyncio +import random +import multiprocessing + +import numpy as np +import pytest + +import ucp + + +def listener(ports): + ucp.init() + + async def _listener(ports): + async def write(ep): + close_msg = np.empty(1, dtype=np.int64) + msg2send = np.arange(10) + msg2recv = np.empty_like(msg2send) + + msgs = [ep.recv(close_msg), ep.send(msg2send), ep.recv(msg2recv)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + if close_msg[0] != 0: + await ep.close() + listeners[close_msg[0]].close() + + listeners = {} + for port in ports: + listeners[port] = ucp.create_listener(write, port=port) + + try: + while not all(listener.closed() for listener in listeners.values()): + await asyncio.sleep(0.1) + except ucp.UCXCloseError: + pass + + asyncio.get_event_loop().run_until_complete(_listener(ports)) + + +def client(listener_ports): + ucp.init() + + async def _client(listener_ports): + async def read(port, close): + close_msg = ( + np.array(port, dtype=np.int64) if close else np.array(0, dtype=np.int64) + ) + msg2send = np.arange(10) + msg2recv = np.empty_like(msg2send) + + ep = await ucp.create_endpoint(ucp.get_address(), port) + msgs = [ep.send(close_msg), ep.send(msg2send), ep.recv(msg2recv)] + await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) + + close_after = 100 + clients = [] + for i in range(close_after): + for port in listener_ports: + close = i == close_after - 1 + clients.append(read(port, close=close)) + + await asyncio.gather(*clients, loop=asyncio.get_event_loop()) + + asyncio.get_event_loop().run_until_complete(_client(listener_ports)) + + +@pytest.mark.parametrize("num_listeners", [1, 2, 4, 8]) +def test_send_recv_cu(num_listeners): + ports = [random.randint(13000, 15500) for n in range(num_listeners)] + + ctx = multiprocessing.get_context("spawn") + listener_process = ctx.Process(name="listener", target=listener, args=[ports]) + client_process = ctx.Process(name="client", target=client, args=[ports]) + + listener_process.start() + client_process.start() + + listener_process.join() + client_process.join() + + assert listener_process.exitcode == 0 + assert client_process.exitcode == 0 From 72d2879cb204ca4bde8142733f2366478fbe48c4 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Mon, 8 Mar 2021 04:06:58 -0800 Subject: [PATCH 4/7] Use bytearray for test_multiple_processes close_msg --- tests/test_multiple_processes.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/test_multiple_processes.py b/tests/test_multiple_processes.py index eca11f870..c20949f41 100644 --- a/tests/test_multiple_processes.py +++ b/tests/test_multiple_processes.py @@ -1,6 +1,7 @@ import asyncio import random import multiprocessing +import sys import numpy as np import pytest @@ -13,16 +14,18 @@ def listener(ports): async def _listener(ports): async def write(ep): - close_msg = np.empty(1, dtype=np.int64) + close_msg = bytearray(2) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) msgs = [ep.recv(close_msg), ep.send(msg2send), ep.recv(msg2recv)] await asyncio.gather(*msgs, loop=asyncio.get_event_loop()) - if close_msg[0] != 0: + close_msg = int.from_bytes(close_msg, sys.byteorder) + + if close_msg != 0: await ep.close() - listeners[close_msg[0]].close() + listeners[close_msg].close() listeners = {} for port in ports: @@ -42,9 +45,7 @@ def client(listener_ports): async def _client(listener_ports): async def read(port, close): - close_msg = ( - np.array(port, dtype=np.int64) if close else np.array(0, dtype=np.int64) - ) + close_msg = bytearray(int(port if close else 0).to_bytes(2, sys.byteorder)) msg2send = np.arange(10) msg2recv = np.empty_like(msg2send) From 8f3d20de522f987832a13d5c6d6576e6ce6c12ce Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Mon, 8 Mar 2021 05:26:56 -0800 Subject: [PATCH 5/7] Avoid duplicating ports in test_multiple_processes --- tests/test_multiple_processes.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_multiple_processes.py b/tests/test_multiple_processes.py index c20949f41..f216d4838 100644 --- a/tests/test_multiple_processes.py +++ b/tests/test_multiple_processes.py @@ -67,7 +67,13 @@ async def read(port, close): @pytest.mark.parametrize("num_listeners", [1, 2, 4, 8]) def test_send_recv_cu(num_listeners): - ports = [random.randint(13000, 15500) for n in range(num_listeners)] + ports = set() + while len(ports) != num_listeners: + ports = ports.union( + [random.randint(13000, 23000) for n in range(num_listeners)] + ) + print(ports) + ports = list(ports) ctx = multiprocessing.get_context("spawn") listener_process = ctx.Process(name="listener", target=listener, args=[ports]) From 6f43b3094e018cf504b4e44c499279b1ddab8387 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Mon, 15 Mar 2021 09:13:12 -0700 Subject: [PATCH 6/7] Fix isort formatting --- tests/test_multiple_processes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_multiple_processes.py b/tests/test_multiple_processes.py index f216d4838..8fe15c441 100644 --- a/tests/test_multiple_processes.py +++ b/tests/test_multiple_processes.py @@ -1,6 +1,6 @@ import asyncio -import random import multiprocessing +import random import sys import numpy as np From 494983e787ae5273aa1acc5a38e070c65b0b3db6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev <peter@entschev.com> Date: Mon, 15 Mar 2021 10:38:57 -0700 Subject: [PATCH 7/7] Remove debugging code --- tests/test_multiple_processes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_multiple_processes.py b/tests/test_multiple_processes.py index 8fe15c441..c8b9f7861 100644 --- a/tests/test_multiple_processes.py +++ b/tests/test_multiple_processes.py @@ -72,7 +72,6 @@ def test_send_recv_cu(num_listeners): ports = ports.union( [random.randint(13000, 23000) for n in range(num_listeners)] ) - print(ports) ports = list(ports) ctx = multiprocessing.get_context("spawn")