From c17d7c65049e89191677d34d19cca8cf3dafd60c Mon Sep 17 00:00:00 2001 From: tpietzsch Date: Sun, 21 Apr 2024 22:53:19 +0200 Subject: [PATCH 01/18] Add forklift channel for flake8 on aarch64 macs --- dev-environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-environment.yml b/dev-environment.yml index 3e35226..e7f9d2c 100644 --- a/dev-environment.yml +++ b/dev-environment.yml @@ -18,6 +18,7 @@ name: appose-dev channels: - conda-forge - defaults + - forklift dependencies: - python >= 3.10 # Developer tools From 55f3e236c0356cb25e4388064a273f6874db9117 Mon Sep 17 00:00:00 2001 From: tpietzsch Date: Mon, 22 Apr 2024 10:52:18 +0200 Subject: [PATCH 02/18] Deserialize appose_type "shm" and "ndarray" --- src/appose/types.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/appose/types.py b/src/appose/types.py index 803f642..4c83b3c 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -29,6 +29,7 @@ import json from typing import Any, Dict +from multiprocessing import shared_memory Args = Dict[str, Any] @@ -38,4 +39,34 @@ def encode(data: Args) -> str: def decode(the_json: str) -> Args: - return json.loads(the_json) + return json.loads(the_json, object_hook=_appose_object_hook) + + +class ShmNDArray: + + def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape ): + self.shm = shm + self.dtype = dtype + self.shape = shape + + def __str__(self): + return f"ShmNDArray(shm='{self.shm.name}' ({self.shm.size}), dtype='{self.dtype}', shape={self.shape})" + + def ndarray(self): + try: + import math + import numpy + num_elements = math.prod(self.shape) + return numpy.ndarray(num_elements, dtype=self.dtype, buffer=self.shm.buf).reshape(self.shape) + except ModuleNotFoundError: + raise ImportError("NumPy is not available.") + + +def _appose_object_hook(obj: Dict): + type = obj.get('appose_type') + if type == 'shm': + return shared_memory.SharedMemory(name=(obj['name']), size=(obj['size'])) + elif type == 'ndarray': + return ShmNDArray(obj['shm'], obj['dtype'], obj['shape']) + else: + return obj From f5d3b310bbed3c7b103bcf1cf75eab86a5ef24ad Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 9 Jul 2024 15:34:54 -0500 Subject: [PATCH 03/18] Fix linting errors --- src/appose/types.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index 4c83b3c..a09781d 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -28,8 +28,8 @@ ### import json -from typing import Any, Dict from multiprocessing import shared_memory +from typing import Any, Dict Args = Dict[str, Any] @@ -43,30 +43,38 @@ def decode(the_json: str) -> Args: class ShmNDArray: - - def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape ): + def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape): self.shm = shm self.dtype = dtype self.shape = shape def __str__(self): - return f"ShmNDArray(shm='{self.shm.name}' ({self.shm.size}), dtype='{self.dtype}', shape={self.shape})" + return ( + f"ShmNDArray(" + f"shm='{self.shm.name}' ({self.shm.size}), " + f"dtype='{self.dtype}', " + f"shape={self.shape})" + ) def ndarray(self): try: import math + import numpy + num_elements = math.prod(self.shape) - return numpy.ndarray(num_elements, dtype=self.dtype, buffer=self.shm.buf).reshape(self.shape) + return numpy.ndarray( + num_elements, dtype=self.dtype, buffer=self.shm.buf + ).reshape(self.shape) except ModuleNotFoundError: raise ImportError("NumPy is not available.") def _appose_object_hook(obj: Dict): - type = obj.get('appose_type') - if type == 'shm': - return shared_memory.SharedMemory(name=(obj['name']), size=(obj['size'])) - elif type == 'ndarray': - return ShmNDArray(obj['shm'], obj['dtype'], obj['shape']) + type = obj.get("appose_type") + if type == "shm": + return shared_memory.SharedMemory(name=(obj["name"]), size=(obj["size"])) + elif type == "ndarray": + return ShmNDArray(obj["shm"], obj["dtype"], obj["shape"]) else: return obj From d8e0b79910c77d7f8ebf7089e85b93533ea7f267 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Mon, 15 Jul 2024 12:25:12 -0500 Subject: [PATCH 04/18] Rename ShmNDArray to NDArray For consistency with appose-java. --- src/appose/types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index a09781d..195e3bf 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -42,7 +42,7 @@ def decode(the_json: str) -> Args: return json.loads(the_json, object_hook=_appose_object_hook) -class ShmNDArray: +class NDArray: def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape): self.shm = shm self.dtype = dtype @@ -50,7 +50,7 @@ def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape): def __str__(self): return ( - f"ShmNDArray(" + f"NDArray(" f"shm='{self.shm.name}' ({self.shm.size}), " f"dtype='{self.dtype}', " f"shape={self.shape})" @@ -75,6 +75,6 @@ def _appose_object_hook(obj: Dict): if type == "shm": return shared_memory.SharedMemory(name=(obj["name"]), size=(obj["size"])) elif type == "ndarray": - return ShmNDArray(obj["shm"], obj["dtype"], obj["shape"]) + return NDArray(obj["shm"], obj["dtype"], obj["shape"]) else: return obj From fd3c78ff59a184a425777cd66f40ffe0be352db1 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Mon, 15 Jul 2024 12:25:47 -0500 Subject: [PATCH 05/18] Import SharedMemory class directly --- src/appose/types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index 195e3bf..4f6adaa 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -28,7 +28,7 @@ ### import json -from multiprocessing import shared_memory +from multiprocessing.shared_memory import SharedMemory from typing import Any, Dict Args = Dict[str, Any] @@ -43,7 +43,7 @@ def decode(the_json: str) -> Args: class NDArray: - def __init__(self, shm: shared_memory.SharedMemory, dtype: str, shape): + def __init__(self, shm: SharedMemory, dtype: str, shape): self.shm = shm self.dtype = dtype self.shape = shape @@ -73,7 +73,7 @@ def ndarray(self): def _appose_object_hook(obj: Dict): type = obj.get("appose_type") if type == "shm": - return shared_memory.SharedMemory(name=(obj["name"]), size=(obj["size"])) + return SharedMemory(name=(obj["name"]), size=(obj["size"])) elif type == "ndarray": return NDArray(obj["shm"], obj["dtype"], obj["shape"]) else: From 997a7cd543bf4f119445cbc52144b2a1f38f3673 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Mon, 15 Jul 2024 12:26:09 -0500 Subject: [PATCH 06/18] Add docstrings to NDArray class --- src/appose/types.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/appose/types.py b/src/appose/types.py index 4f6adaa..b8d9f33 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -43,7 +43,20 @@ def decode(the_json: str) -> Args: class NDArray: + """ + Data structure for a multi-dimensional array. + The array contains elements of a data type, arranged in + a particular shape, and flattened into SharedMemory. + """ + def __init__(self, shm: SharedMemory, dtype: str, shape): + """ + Create an NDArray. + :param shm: The SharedMemory containing the array data. + :param dtype: The type of the data elements; e.g. int8, uint8, float32, float64. + :param shape: The dimensional extents; e.g. a stack of 7 image planes + with resolution 512x512 would have shape [7, 512, 512]. + """ self.shm = shm self.dtype = dtype self.shape = shape @@ -57,6 +70,11 @@ def __str__(self): ) def ndarray(self): + """ + Create a NumPy ndarray object for working with the array data. + No array data is copied; the NumPy array wraps the same SharedMemory. + Requires the numpy package to be installed. + """ try: import math From c1eb35069cd8044adb930b4c763c47c50a10e36a Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Mon, 15 Jul 2024 12:48:52 -0500 Subject: [PATCH 07/18] Add type hint for NDArray shape parameter --- src/appose/types.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index b8d9f33..e843c9f 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -29,7 +29,7 @@ import json from multiprocessing.shared_memory import SharedMemory -from typing import Any, Dict +from typing import Any, Dict, Sequence Args = Dict[str, Any] @@ -49,7 +49,7 @@ class NDArray: a particular shape, and flattened into SharedMemory. """ - def __init__(self, shm: SharedMemory, dtype: str, shape): + def __init__(self, shm: SharedMemory, dtype: str, shape: Sequence[int]): """ Create an NDArray. :param shm: The SharedMemory containing the array data. From 0f893602c64273889641e387765a6a92511f2f51 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 14:21:30 -0500 Subject: [PATCH 08/18] Swap order of NDArray constructor args By putting shm last, we can give it a default value of None, so that the SharedMemory can be created on the fly if not given. See also apposed/appose-java@434603673eb6354da6b785ef8125a55ebc88ff20 --- src/appose/types.py | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index e843c9f..dd11bd1 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -28,8 +28,10 @@ ### import json +import re +from math import ceil, prod from multiprocessing.shared_memory import SharedMemory -from typing import Any, Dict, Sequence +from typing import Any, Dict, Sequence, Union Args = Dict[str, Any] @@ -49,24 +51,30 @@ class NDArray: a particular shape, and flattened into SharedMemory. """ - def __init__(self, shm: SharedMemory, dtype: str, shape: Sequence[int]): + def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None): """ Create an NDArray. - :param shm: The SharedMemory containing the array data. :param dtype: The type of the data elements; e.g. int8, uint8, float32, float64. :param shape: The dimensional extents; e.g. a stack of 7 image planes with resolution 512x512 would have shape [7, 512, 512]. + :param shm: The SharedMemory containing the array data, or None to create it. """ - self.shm = shm self.dtype = dtype self.shape = shape + self.shm = ( + SharedMemory( + create=True, size=ceil(prod(shape) * _bytes_per_element(dtype)) + ) + if shm is None + else shm + ) def __str__(self): return ( f"NDArray(" - f"shm='{self.shm.name}' ({self.shm.size}), " f"dtype='{self.dtype}', " - f"shape={self.shape})" + f"shape={self.shape}, " + f"shm='{self.shm.name}' ({self.shm.size}))" ) def ndarray(self): @@ -76,13 +84,10 @@ def ndarray(self): Requires the numpy package to be installed. """ try: - import math - import numpy - num_elements = math.prod(self.shape) return numpy.ndarray( - num_elements, dtype=self.dtype, buffer=self.shm.buf + prod(self.shape), dtype=self.dtype, buffer=self.shm.buf ).reshape(self.shape) except ModuleNotFoundError: raise ImportError("NumPy is not available.") @@ -93,6 +98,14 @@ def _appose_object_hook(obj: Dict): if type == "shm": return SharedMemory(name=(obj["name"]), size=(obj["size"])) elif type == "ndarray": - return NDArray(obj["shm"], obj["dtype"], obj["shape"]) + return NDArray(obj["dtype"], obj["shape"], obj["shm"]) else: return obj + + +def _bytes_per_element(dtype: str) -> Union[int, float]: + try: + bits = int(re.sub("[^0-9]", "", dtype)) + except ValueError: + raise ValueError(f"Invalid dtype: {dtype}") + return bits / 8 From 934711c505b5653fea6a380c56682c64c7adc7ae Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 14:22:55 -0500 Subject: [PATCH 09/18] Avoid using `type` as a variable name Let's not shadow the built-in type function. --- src/appose/types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index dd11bd1..ad3fee8 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -94,10 +94,10 @@ def ndarray(self): def _appose_object_hook(obj: Dict): - type = obj.get("appose_type") - if type == "shm": + atype = obj.get("appose_type") + if atype == "shm": return SharedMemory(name=(obj["name"]), size=(obj["size"])) - elif type == "ndarray": + elif atype == "ndarray": return NDArray(obj["dtype"], obj["shape"], obj["shm"]) else: return obj From 17d76c809e07c8ab111575ca8e1c068d40f40d0d Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 15:23:54 -0500 Subject: [PATCH 10/18] Make encoded JSON as compact as possible Not likely to make an actual performance difference, but may as well. This also better matches the JSON-encoding behavior of appose-java. --- src/appose/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/appose/types.py b/src/appose/types.py index ad3fee8..fe39729 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -37,7 +37,7 @@ def encode(data: Args) -> str: - return json.dumps(data) + return json.dumps(data, separators=(",", ":")) def decode(the_json: str) -> Args: From 8394df8ae6d70233b103ed79dc32b247a3fb60d8 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 14:33:42 -0500 Subject: [PATCH 11/18] Add tests for JSON en/de-code functions --- tests/test_types.py | 81 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 tests/test_types.py diff --git a/tests/test_types.py b/tests/test_types.py new file mode 100644 index 0000000..779e821 --- /dev/null +++ b/tests/test_types.py @@ -0,0 +1,81 @@ +import unittest + +import appose + + +class TypesTest(unittest.TestCase): + JSON = ( + "{" + '"posByte":123,"negByte":-98,' + '"posDouble":9.876543210123456,"negDouble":-1.234567890987654e+302,' + '"posFloat":9.876543,"negFloat":-1.2345678,' + '"posInt":1234567890,"negInt":-987654321,' + '"posLong":12345678987654321,"negLong":-98765432123456789,' + '"posShort":32109,"negShort":-23456,' + '"trueBoolean":true,"falseBoolean":false,' + '"nullChar":"\\u0000",' + '"aString":"-=[]\\\\;\',./_+{}|:\\"<>?' + "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + '~!@#$%^&*()",' + '"numbers":[1,1,2,3,5,8],' + '"words":["quick","brown","fox"]' + "}" + ) + + STRING = ( + "-=[]\\;',./_+{}|:\"<>?" + "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + "~!@#$%^&*()" + ) + + NUMBERS = [1, 1, 2, 3, 5, 8] + + WORDS = ["quick", "brown", "fox"] + + def test_encode(self): + data = { + "posByte": 123, + "negByte": -98, + "posDouble": 9.876543210123456, + "negDouble": -1.234567890987654e302, + "posFloat": 9.876543, + "negFloat": -1.2345678, + "posInt": 1234567890, + "negInt": -987654321, + "posLong": 12345678987654321, + "negLong": -98765432123456789, + "posShort": 32109, + "negShort": -23456, + "trueBoolean": True, + "falseBoolean": False, + "nullChar": "\0", + "aString": self.STRING, + "numbers": self.NUMBERS, + "words": self.WORDS, + } + json_str = appose.types.encode(data) + self.assertIsNotNone(json_str) + self.assertEqual(self.JSON, json_str) + + def test_decode(self): + data = appose.types.decode(self.JSON) + self.assertIsNotNone(data) + self.assertEqual(18, len(data)) + self.assertEqual(123, data["posByte"]) + self.assertEqual(-98, data["negByte"]) + self.assertEqual(9.876543210123456, data["posDouble"]) + self.assertEqual(-1.234567890987654e302, data["negDouble"]) + self.assertEqual(9.876543, data["posFloat"]) + self.assertEqual(-1.2345678, data["negFloat"]) + self.assertEqual(1234567890, data["posInt"]) + self.assertEqual(-987654321, data["negInt"]) + self.assertEqual(12345678987654321, data["posLong"]) + self.assertEqual(-98765432123456789, data["negLong"]) + self.assertEqual(32109, data["posShort"]) + self.assertEqual(-23456, data["negShort"]) + self.assertTrue(data["trueBoolean"]) + self.assertFalse(data["falseBoolean"]) + self.assertEqual("\0", data["nullChar"]) + self.assertEqual(self.STRING, data["aString"]) + self.assertEqual(self.NUMBERS, data["numbers"]) + self.assertEqual(self.WORDS, data["words"]) From 670ee9c14c9036b60e98773e6e41a98da7297f5b Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 14:23:30 -0500 Subject: [PATCH 12/18] Support JSON-encoding of NDArrays --- src/appose/types.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/appose/types.py b/src/appose/types.py index fe39729..855b7df 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -37,7 +37,7 @@ def encode(data: Args) -> str: - return json.dumps(data, separators=(",", ":")) + return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":")) def decode(the_json: str) -> Args: @@ -93,6 +93,24 @@ def ndarray(self): raise ImportError("NumPy is not available.") +class _ApposeJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, SharedMemory): + return { + "appose_type": "shm", + "name": obj.name, + "size": obj.size, + } + if isinstance(obj, NDArray): + return { + "appose_type": "ndarray", + "dtype": obj.dtype, + "shape": obj.shape, + "shm": obj.shm, + } + return super().default(obj) + + def _appose_object_hook(obj: Dict): atype = obj.get("appose_type") if atype == "shm": From 51b6b0c10ffacb873b1a7980a55de4346e0a1835 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Tue, 16 Jul 2024 14:33:06 -0500 Subject: [PATCH 13/18] Add tests for SharedMemory and NDArray --- tests/test_shm.py | 66 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_types.py | 32 +++++++++++++++++++--- 2 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 tests/test_shm.py diff --git a/tests/test_shm.py b/tests/test_shm.py new file mode 100644 index 0000000..e77e86f --- /dev/null +++ b/tests/test_shm.py @@ -0,0 +1,66 @@ +### +# #%L +# Appose: multi-language interprocess cooperation with shared memory. +# %% +# Copyright (C) 2023 Appose developers. +# %% +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# #L% +### + +from multiprocessing.shared_memory import SharedMemory + +import appose +from appose.service import TaskStatus +from appose.types import NDArray + +ndarray_inspect = """ +task.outputs["size"] = data.shm.size +task.outputs["dtype"] = data.dtype +task.outputs["shape"] = data.shape +task.outputs["sum"] = sum(v for v in data.shm.buf) +""" + + +def test_ndarray(): + env = appose.system() + with env.python() as service: + # Construct the data. + shm = SharedMemory(create=True, size=2 * 2 * 20 * 25) + shm.buf[0] = 123 + shm.buf[456] = 78 + shm.buf[1999] = 210 + data = NDArray("uint16", [2, 20, 25], shm) + + # Run the task. + task = service.task(ndarray_inspect, {"data": data}) + task.wait_for() + + # Validate the execution result. + assert TaskStatus.COMPLETE == task.status + assert 2 * 20 * 25 * 2 == task.outputs["size"] + assert "uint16" == task.outputs["dtype"] + assert [2, 20, 25] == task.outputs["shape"] + assert 123 + 78 + 210 == task.outputs["sum"] + + # Clean up. + shm.unlink() diff --git a/tests/test_types.py b/tests/test_types.py index 779e821..52a1c46 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,4 +1,5 @@ import unittest +from multiprocessing.shared_memory import SharedMemory import appose @@ -18,7 +19,19 @@ class TypesTest(unittest.TestCase): "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" '~!@#$%^&*()",' '"numbers":[1,1,2,3,5,8],' - '"words":["quick","brown","fox"]' + '"words":["quick","brown","fox"],' + # fmt: off + '"ndArray":{' + '"appose_type":"ndarray",' # noqa: E131 + '"dtype":"float32",' # noqa: E131 + '"shape":[2,20,25],' # noqa: E131 + '"shm":{' # noqa: E131 + '"appose_type":"shm",' # noqa: E131 + '"name":"SHM_NAME",' # noqa: E131 + '"size":4000' # noqa: E131 + "}" # noqa: E131 + "}" + # fmt: on "}" ) @@ -53,14 +66,21 @@ def test_encode(self): "numbers": self.NUMBERS, "words": self.WORDS, } + ndarray = appose.types.NDArray("float32", [2, 20, 25]) + shm_name = ndarray.shm.name + data["ndArray"] = ndarray json_str = appose.types.encode(data) self.assertIsNotNone(json_str) - self.assertEqual(self.JSON, json_str) + expected = self.JSON.replace("SHM_NAME", shm_name) + self.assertEqual(expected, json_str) + ndarray.shm.unlink() def test_decode(self): - data = appose.types.decode(self.JSON) + shm = SharedMemory(create=True, size=4000) + shm_name = shm.name + data = appose.types.decode(self.JSON.replace("SHM_NAME", shm_name)) self.assertIsNotNone(data) - self.assertEqual(18, len(data)) + self.assertEqual(19, len(data)) self.assertEqual(123, data["posByte"]) self.assertEqual(-98, data["negByte"]) self.assertEqual(9.876543210123456, data["posDouble"]) @@ -79,3 +99,7 @@ def test_decode(self): self.assertEqual(self.STRING, data["aString"]) self.assertEqual(self.NUMBERS, data["numbers"]) self.assertEqual(self.WORDS, data["words"]) + ndArray = data["ndArray"] + self.assertEqual("float32", ndArray.dtype) + self.assertEqual([2, 20, 25], ndArray.shape) + shm.unlink() From 8c2748effd3b1d4d7147442aa37e5f622bb657bc Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Wed, 17 Jul 2024 19:43:27 -0500 Subject: [PATCH 14/18] Make appose.SharedMemory and appose.NDArray work So that these classes are in the same package (the base one) as the Java codebase's equivalents. And so that people don't need to know to import SharedMemory from multiprocessing.shared_memory, which is kind of a keyboardful. --- src/appose/__init__.py | 1 + tests/test_shm.py | 7 ++----- tests/test_types.py | 5 ++--- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/appose/__init__.py b/src/appose/__init__.py index 93c44c3..db1141a 100644 --- a/src/appose/__init__.py +++ b/src/appose/__init__.py @@ -134,6 +134,7 @@ def task_listener(event): from pathlib import Path from .environment import Builder, Environment +from .types import NDArray, SharedMemory # noqa: F401 def base(directory: Path) -> Builder: diff --git a/tests/test_shm.py b/tests/test_shm.py index e77e86f..965e398 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -27,11 +27,8 @@ # #L% ### -from multiprocessing.shared_memory import SharedMemory - import appose from appose.service import TaskStatus -from appose.types import NDArray ndarray_inspect = """ task.outputs["size"] = data.shm.size @@ -45,11 +42,11 @@ def test_ndarray(): env = appose.system() with env.python() as service: # Construct the data. - shm = SharedMemory(create=True, size=2 * 2 * 20 * 25) + shm = appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) shm.buf[0] = 123 shm.buf[456] = 78 shm.buf[1999] = 210 - data = NDArray("uint16", [2, 20, 25], shm) + data = appose.NDArray("uint16", [2, 20, 25], shm) # Run the task. task = service.task(ndarray_inspect, {"data": data}) diff --git a/tests/test_types.py b/tests/test_types.py index 52a1c46..445c749 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,5 +1,4 @@ import unittest -from multiprocessing.shared_memory import SharedMemory import appose @@ -66,7 +65,7 @@ def test_encode(self): "numbers": self.NUMBERS, "words": self.WORDS, } - ndarray = appose.types.NDArray("float32", [2, 20, 25]) + ndarray = appose.NDArray("float32", [2, 20, 25]) shm_name = ndarray.shm.name data["ndArray"] = ndarray json_str = appose.types.encode(data) @@ -76,7 +75,7 @@ def test_encode(self): ndarray.shm.unlink() def test_decode(self): - shm = SharedMemory(create=True, size=4000) + shm = appose.SharedMemory(create=True, size=4000) shm_name = shm.name data = appose.types.decode(self.JSON.replace("SHM_NAME", shm_name)) self.assertIsNotNone(data) From 62ffa11accc756e3cfd2e90abdde4a8168bfd538 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Sat, 10 Aug 2024 14:04:50 -0500 Subject: [PATCH 15/18] Add docstring for Python worker --- src/appose/python_worker.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index 551e931..dc4f8e1 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -28,7 +28,14 @@ ### """ -TODO +The Appose worker for running Python scripts. + +Like all Appose workers, this program conforms to the Appose worker process +contract, meaning it accepts requests on stdin and produces responses on +stdout, both formatted according to Appose's assumptions. + +For details, see the Appose README: +https://github.com/apposed/appose/blob/-/README.md#workers """ import ast From c6a1ed69db27a723036dc87bcd1b3d8f168704cb Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Sat, 10 Aug 2024 14:52:50 -0500 Subject: [PATCH 16/18] Disable Python worker's resource tracker To keep things manageable, we never want Python worker processes cleaning up shared memory blocks. We leave it to the service process to do that, always. --- src/appose/python_worker.py | 4 +++- src/appose/types.py | 33 +++++++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index dc4f8e1..1cf881b 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -46,7 +46,7 @@ # NB: Avoid relative imports so that this script can be run standalone. from appose.service import RequestType, ResponseType -from appose.types import Args, decode, encode +from appose.types import Args, _set_worker, decode, encode class Task: @@ -163,6 +163,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None: def main() -> None: + _set_worker(True) + tasks = {} while True: diff --git a/src/appose/types.py b/src/appose/types.py index 855b7df..dbd877e 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -30,6 +30,7 @@ import json import re from math import ceil, prod +from multiprocessing import resource_tracker from multiprocessing.shared_memory import SharedMemory from typing import Any, Dict, Sequence, Union @@ -62,9 +63,7 @@ def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None): self.dtype = dtype self.shape = shape self.shm = ( - SharedMemory( - create=True, size=ceil(prod(shape) * _bytes_per_element(dtype)) - ) + _create_shm(create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))) if shm is None else shm ) @@ -114,7 +113,8 @@ def default(self, obj): def _appose_object_hook(obj: Dict): atype = obj.get("appose_type") if atype == "shm": - return SharedMemory(name=(obj["name"]), size=(obj["size"])) + # Attach to existing shared memory block. + return _create_shm(name=(obj["name"]), size=(obj["size"])) elif atype == "ndarray": return NDArray(obj["dtype"], obj["shape"], obj["shm"]) else: @@ -127,3 +127,28 @@ def _bytes_per_element(dtype: str) -> Union[int, float]: except ValueError: raise ValueError(f"Invalid dtype: {dtype}") return bits / 8 + + +def _create_shm(name: str = None, create: bool = False, size: int = 0): + shm = SharedMemory(name=name, create=create, size=size) + if _is_worker: + # HACK: Disable this process's resource_tracker, which wants to clean up + # shared memory blocks after all known references are done using them. + # + # There is one resource_tracker per Python process, and they will each + # try to delete shared memory blocks known to them when they are + # shutting down, even when other processes still need them. + # + # As such, the rule Appose follows is: let the service process always + # do the cleanup of shared memory blocks, regardless of which process + # initially allocated it. + resource_tracker.unregister(shm._name, "shared_memory") + return shm + + +_is_worker = False + + +def _set_worker(value: bool) -> None: + global _is_worker + _is_worker = value From 522f0e9e5de76e9e873fd237d952c73a15ca4b69 Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Sat, 10 Aug 2024 14:57:03 -0500 Subject: [PATCH 17/18] Remove obsolete TODO comments --- src/appose/python_worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index 1cf881b..97f2605 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -87,7 +87,6 @@ def _start(self, script: str, inputs: Optional[Args]) -> None: def execute_script(): # Populate script bindings. binding = {"task": self} - # TODO: Magically convert shared memory image inputs. if inputs is not None: binding.update(inputs) @@ -190,8 +189,6 @@ def main() -> None: case RequestType.CANCEL: task = tasks.get(uuid) if task is None: - # TODO: proper logging - # Maybe should stdout the error back to Appose calling process. print(f"No such task: {uuid}", file=sys.stderr) continue task.cancel_requested = True From 84d45e6c5a89d69c1dc64c469b93370c1bd33f3e Mon Sep 17 00:00:00 2001 From: Curtis Rueden Date: Sat, 10 Aug 2024 16:22:41 -0500 Subject: [PATCH 18/18] Let SharedMemory and NDArray support `with` blocks To do this, we subclass Python's SharedMemory class, rather than simply passing it along verbatim anymore. And we implement the __exit__ method to call the Appose SharedMemory subclass's dispose() method, which calls either close() or unlink() depending on whether the unlink_on_dispose flag is set. Note that this implementation cannot quite align with the Java one, because in Java, the AutoCloseable interface always calls close(). As such, the close() method must be overridden and taught to sometimes call unlink(), and sometimes not, depending on the unlinkOnClose flag. --- src/appose/types.py | 85 ++++++++++++++++++++++++++++++++++----------- tests/test_shm.py | 33 ++++++++---------- tests/test_types.py | 68 ++++++++++++++++++------------------ 3 files changed, 112 insertions(+), 74 deletions(-) diff --git a/src/appose/types.py b/src/appose/types.py index dbd877e..363ea7e 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -30,13 +30,65 @@ import json import re from math import ceil, prod -from multiprocessing import resource_tracker -from multiprocessing.shared_memory import SharedMemory +from multiprocessing import resource_tracker, shared_memory from typing import Any, Dict, Sequence, Union Args = Dict[str, Any] +class SharedMemory(shared_memory.SharedMemory): + """ + An enhanced version of Python's multiprocessing.shared_memory.SharedMemory + class which can be used with a `with` statement. When the program flow + exits the `with` block, this class's `dispose()` method will be invoked, + which might call `close()` or `unlink()` depending on the value of its + `unlink_on_dispose` flag. + """ + + def __init__(self, name: str = None, create: bool = False, size: int = 0): + super().__init__(name=name, create=create, size=size) + self._unlink_on_dispose = create + if _is_worker: + # HACK: Remove this shared memory block from the resource_tracker, + # which wants to clean up shared memory blocks after all known + # references are done using them. + # + # There is one resource_tracker per Python process, and they will + # each try to delete shared memory blocks known to them when they + # are shutting down, even when other processes still need them. + # + # As such, the rule Appose follows is: let the service process + # always handle cleanup of shared memory blocks, regardless of + # which process initially allocated it. + resource_tracker.unregister(self._name, "shared_memory") + + def unlink_on_dispose(self, value: bool) -> None: + """ + Set whether the `unlink()` method should be invoked to destroy + the shared memory block when the `dispose()` method is called. + + Note: dispose() is the method called when exiting a `with` block. + + By default, shared memory objects constructed with `create=True` + will behave this way, whereas shared memory objects constructed + with `create=False` will not. But this method allows to override + the behavior. + """ + self._unlink_on_dispose = value + + def dispose(self) -> None: + if self._unlink_on_dispose: + self.unlink() + else: + self.close() + + def __enter__(self) -> "SharedMemory": + return self + + def __exit__(self, exc_type, exc_value, exc_tb) -> None: + self.dispose() + + def encode(data: Args) -> str: return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":")) @@ -63,7 +115,9 @@ def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None): self.dtype = dtype self.shape = shape self.shm = ( - _create_shm(create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))) + SharedMemory( + create=True, size=ceil(prod(shape) * _bytes_per_element(dtype)) + ) if shm is None else shm ) @@ -91,6 +145,12 @@ def ndarray(self): except ModuleNotFoundError: raise ImportError("NumPy is not available.") + def __enter__(self) -> "NDArray": + return self + + def __exit__(self, exc_type, exc_value, exc_tb) -> None: + self.shm.dispose() + class _ApposeJSONEncoder(json.JSONEncoder): def default(self, obj): @@ -114,7 +174,7 @@ def _appose_object_hook(obj: Dict): atype = obj.get("appose_type") if atype == "shm": # Attach to existing shared memory block. - return _create_shm(name=(obj["name"]), size=(obj["size"])) + return SharedMemory(name=(obj["name"]), size=(obj["size"])) elif atype == "ndarray": return NDArray(obj["dtype"], obj["shape"], obj["shm"]) else: @@ -129,23 +189,6 @@ def _bytes_per_element(dtype: str) -> Union[int, float]: return bits / 8 -def _create_shm(name: str = None, create: bool = False, size: int = 0): - shm = SharedMemory(name=name, create=create, size=size) - if _is_worker: - # HACK: Disable this process's resource_tracker, which wants to clean up - # shared memory blocks after all known references are done using them. - # - # There is one resource_tracker per Python process, and they will each - # try to delete shared memory blocks known to them when they are - # shutting down, even when other processes still need them. - # - # As such, the rule Appose follows is: let the service process always - # do the cleanup of shared memory blocks, regardless of which process - # initially allocated it. - resource_tracker.unregister(shm._name, "shared_memory") - return shm - - _is_worker = False diff --git a/tests/test_shm.py b/tests/test_shm.py index 965e398..68dcf8b 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -41,23 +41,20 @@ def test_ndarray(): env = appose.system() with env.python() as service: - # Construct the data. - shm = appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) - shm.buf[0] = 123 - shm.buf[456] = 78 - shm.buf[1999] = 210 - data = appose.NDArray("uint16", [2, 20, 25], shm) + with appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) as shm: + # Construct the data. + shm.buf[0] = 123 + shm.buf[456] = 78 + shm.buf[1999] = 210 + data = appose.NDArray("uint16", [2, 20, 25], shm) - # Run the task. - task = service.task(ndarray_inspect, {"data": data}) - task.wait_for() + # Run the task. + task = service.task(ndarray_inspect, {"data": data}) + task.wait_for() - # Validate the execution result. - assert TaskStatus.COMPLETE == task.status - assert 2 * 20 * 25 * 2 == task.outputs["size"] - assert "uint16" == task.outputs["dtype"] - assert [2, 20, 25] == task.outputs["shape"] - assert 123 + 78 + 210 == task.outputs["sum"] - - # Clean up. - shm.unlink() + # Validate the execution result. + assert TaskStatus.COMPLETE == task.status + assert 2 * 20 * 25 * 2 == task.outputs["size"] + assert "uint16" == task.outputs["dtype"] + assert [2, 20, 25] == task.outputs["shape"] + assert 123 + 78 + 210 == task.outputs["sum"] diff --git a/tests/test_types.py b/tests/test_types.py index 445c749..dc38d23 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -65,40 +65,38 @@ def test_encode(self): "numbers": self.NUMBERS, "words": self.WORDS, } - ndarray = appose.NDArray("float32", [2, 20, 25]) - shm_name = ndarray.shm.name - data["ndArray"] = ndarray - json_str = appose.types.encode(data) - self.assertIsNotNone(json_str) - expected = self.JSON.replace("SHM_NAME", shm_name) - self.assertEqual(expected, json_str) - ndarray.shm.unlink() + with appose.NDArray("float32", [2, 20, 25]) as ndarray: + shm_name = ndarray.shm.name + data["ndArray"] = ndarray + json_str = appose.types.encode(data) + self.assertIsNotNone(json_str) + expected = self.JSON.replace("SHM_NAME", shm_name) + self.assertEqual(expected, json_str) def test_decode(self): - shm = appose.SharedMemory(create=True, size=4000) - shm_name = shm.name - data = appose.types.decode(self.JSON.replace("SHM_NAME", shm_name)) - self.assertIsNotNone(data) - self.assertEqual(19, len(data)) - self.assertEqual(123, data["posByte"]) - self.assertEqual(-98, data["negByte"]) - self.assertEqual(9.876543210123456, data["posDouble"]) - self.assertEqual(-1.234567890987654e302, data["negDouble"]) - self.assertEqual(9.876543, data["posFloat"]) - self.assertEqual(-1.2345678, data["negFloat"]) - self.assertEqual(1234567890, data["posInt"]) - self.assertEqual(-987654321, data["negInt"]) - self.assertEqual(12345678987654321, data["posLong"]) - self.assertEqual(-98765432123456789, data["negLong"]) - self.assertEqual(32109, data["posShort"]) - self.assertEqual(-23456, data["negShort"]) - self.assertTrue(data["trueBoolean"]) - self.assertFalse(data["falseBoolean"]) - self.assertEqual("\0", data["nullChar"]) - self.assertEqual(self.STRING, data["aString"]) - self.assertEqual(self.NUMBERS, data["numbers"]) - self.assertEqual(self.WORDS, data["words"]) - ndArray = data["ndArray"] - self.assertEqual("float32", ndArray.dtype) - self.assertEqual([2, 20, 25], ndArray.shape) - shm.unlink() + with appose.SharedMemory(create=True, size=4000) as shm: + shm_name = shm.name + data = appose.types.decode(self.JSON.replace("SHM_NAME", shm_name)) + self.assertIsNotNone(data) + self.assertEqual(19, len(data)) + self.assertEqual(123, data["posByte"]) + self.assertEqual(-98, data["negByte"]) + self.assertEqual(9.876543210123456, data["posDouble"]) + self.assertEqual(-1.234567890987654e302, data["negDouble"]) + self.assertEqual(9.876543, data["posFloat"]) + self.assertEqual(-1.2345678, data["negFloat"]) + self.assertEqual(1234567890, data["posInt"]) + self.assertEqual(-987654321, data["negInt"]) + self.assertEqual(12345678987654321, data["posLong"]) + self.assertEqual(-98765432123456789, data["negLong"]) + self.assertEqual(32109, data["posShort"]) + self.assertEqual(-23456, data["negShort"]) + self.assertTrue(data["trueBoolean"]) + self.assertFalse(data["falseBoolean"]) + self.assertEqual("\0", data["nullChar"]) + self.assertEqual(self.STRING, data["aString"]) + self.assertEqual(self.NUMBERS, data["numbers"]) + self.assertEqual(self.WORDS, data["words"]) + ndArray = data["ndArray"] + self.assertEqual("float32", ndArray.dtype) + self.assertEqual([2, 20, 25], ndArray.shape)