Skip to content

Commit

Permalink
Merge pull request #1 from apposed/schmarrn
Browse files Browse the repository at this point in the history
`SharedMemory` and `ndarray` serialization
  • Loading branch information
ctrueden authored Aug 10, 2024
2 parents 04b1f2a + 84d45e6 commit 2350958
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 8 deletions.
1 change: 1 addition & 0 deletions dev-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name: appose-dev
channels:
- conda-forge
- defaults
- forklift
dependencies:
- python >= 3.10
# Developer tools
Expand Down
1 change: 1 addition & 0 deletions src/appose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def task_listener(event):
from pathlib import Path

from .environment import Builder, Environment
from .types import NDArray, SharedMemory # noqa: F401


def base(directory: Path) -> Builder:
Expand Down
16 changes: 11 additions & 5 deletions src/appose/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
###

"""
TODO
The Appose worker for running Python scripts.
Like all Appose workers, this program conforms to the Appose worker process
contract, meaning it accepts requests on stdin and produces responses on
stdout, both formatted according to Appose's assumptions.
For details, see the Appose README:
https://github.com/apposed/appose/blob/-/README.md#workers
"""

import ast
Expand All @@ -39,7 +46,7 @@

# NB: Avoid relative imports so that this script can be run standalone.
from appose.service import RequestType, ResponseType
from appose.types import Args, decode, encode
from appose.types import Args, _set_worker, decode, encode


class Task:
Expand Down Expand Up @@ -80,7 +87,6 @@ def _start(self, script: str, inputs: Optional[Args]) -> None:
def execute_script():
# Populate script bindings.
binding = {"task": self}
# TODO: Magically convert shared memory image inputs.
if inputs is not None:
binding.update(inputs)

Expand Down Expand Up @@ -156,6 +162,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None:


def main() -> None:
_set_worker(True)

tasks = {}

while True:
Expand All @@ -181,8 +189,6 @@ def main() -> None:
case RequestType.CANCEL:
task = tasks.get(uuid)
if task is None:
# TODO: proper logging
# Maybe should stdout the error back to Appose calling process.
print(f"No such task: {uuid}", file=sys.stderr)
continue
task.cancel_requested = True
Expand Down
162 changes: 159 additions & 3 deletions src/appose/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,170 @@
###

import json
from typing import Any, Dict
import re
from math import ceil, prod
from multiprocessing import resource_tracker, shared_memory
from typing import Any, Dict, Sequence, Union

Args = Dict[str, Any]


class SharedMemory(shared_memory.SharedMemory):
"""
An enhanced version of Python's multiprocessing.shared_memory.SharedMemory
class which can be used with a `with` statement. When the program flow
exits the `with` block, this class's `dispose()` method will be invoked,
which might call `close()` or `unlink()` depending on the value of its
`unlink_on_dispose` flag.
"""

def __init__(self, name: str = None, create: bool = False, size: int = 0):
super().__init__(name=name, create=create, size=size)
self._unlink_on_dispose = create
if _is_worker:
# HACK: Remove this shared memory block from the resource_tracker,
# which wants to clean up shared memory blocks after all known
# references are done using them.
#
# There is one resource_tracker per Python process, and they will
# each try to delete shared memory blocks known to them when they
# are shutting down, even when other processes still need them.
#
# As such, the rule Appose follows is: let the service process
# always handle cleanup of shared memory blocks, regardless of
# which process initially allocated it.
resource_tracker.unregister(self._name, "shared_memory")

def unlink_on_dispose(self, value: bool) -> None:
"""
Set whether the `unlink()` method should be invoked to destroy
the shared memory block when the `dispose()` method is called.
Note: dispose() is the method called when exiting a `with` block.
By default, shared memory objects constructed with `create=True`
will behave this way, whereas shared memory objects constructed
with `create=False` will not. But this method allows to override
the behavior.
"""
self._unlink_on_dispose = value

def dispose(self) -> None:
if self._unlink_on_dispose:
self.unlink()
else:
self.close()

def __enter__(self) -> "SharedMemory":
return self

def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.dispose()


def encode(data: Args) -> str:
return json.dumps(data)
return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":"))


def decode(the_json: str) -> Args:
return json.loads(the_json)
return json.loads(the_json, object_hook=_appose_object_hook)


class NDArray:
"""
Data structure for a multi-dimensional array.
The array contains elements of a data type, arranged in
a particular shape, and flattened into SharedMemory.
"""

def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None):
"""
Create an NDArray.
:param dtype: The type of the data elements; e.g. int8, uint8, float32, float64.
:param shape: The dimensional extents; e.g. a stack of 7 image planes
with resolution 512x512 would have shape [7, 512, 512].
:param shm: The SharedMemory containing the array data, or None to create it.
"""
self.dtype = dtype
self.shape = shape
self.shm = (
SharedMemory(
create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))
)
if shm is None
else shm
)

def __str__(self):
return (
f"NDArray("
f"dtype='{self.dtype}', "
f"shape={self.shape}, "
f"shm='{self.shm.name}' ({self.shm.size}))"
)

def ndarray(self):
"""
Create a NumPy ndarray object for working with the array data.
No array data is copied; the NumPy array wraps the same SharedMemory.
Requires the numpy package to be installed.
"""
try:
import numpy

return numpy.ndarray(
prod(self.shape), dtype=self.dtype, buffer=self.shm.buf
).reshape(self.shape)
except ModuleNotFoundError:
raise ImportError("NumPy is not available.")

def __enter__(self) -> "NDArray":
return self

def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.shm.dispose()


class _ApposeJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, SharedMemory):
return {
"appose_type": "shm",
"name": obj.name,
"size": obj.size,
}
if isinstance(obj, NDArray):
return {
"appose_type": "ndarray",
"dtype": obj.dtype,
"shape": obj.shape,
"shm": obj.shm,
}
return super().default(obj)


def _appose_object_hook(obj: Dict):
atype = obj.get("appose_type")
if atype == "shm":
# Attach to existing shared memory block.
return SharedMemory(name=(obj["name"]), size=(obj["size"]))
elif atype == "ndarray":
return NDArray(obj["dtype"], obj["shape"], obj["shm"])
else:
return obj


def _bytes_per_element(dtype: str) -> Union[int, float]:
try:
bits = int(re.sub("[^0-9]", "", dtype))
except ValueError:
raise ValueError(f"Invalid dtype: {dtype}")
return bits / 8


_is_worker = False


def _set_worker(value: bool) -> None:
global _is_worker
_is_worker = value
60 changes: 60 additions & 0 deletions tests/test_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
###
# #%L
# Appose: multi-language interprocess cooperation with shared memory.
# %%
# Copyright (C) 2023 Appose developers.
# %%
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# #L%
###

import appose
from appose.service import TaskStatus

ndarray_inspect = """
task.outputs["size"] = data.shm.size
task.outputs["dtype"] = data.dtype
task.outputs["shape"] = data.shape
task.outputs["sum"] = sum(v for v in data.shm.buf)
"""


def test_ndarray():
env = appose.system()
with env.python() as service:
with appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) as shm:
# Construct the data.
shm.buf[0] = 123
shm.buf[456] = 78
shm.buf[1999] = 210
data = appose.NDArray("uint16", [2, 20, 25], shm)

# Run the task.
task = service.task(ndarray_inspect, {"data": data})
task.wait_for()

# Validate the execution result.
assert TaskStatus.COMPLETE == task.status
assert 2 * 20 * 25 * 2 == task.outputs["size"]
assert "uint16" == task.outputs["dtype"]
assert [2, 20, 25] == task.outputs["shape"]
assert 123 + 78 + 210 == task.outputs["sum"]
Loading

0 comments on commit 2350958

Please sign in to comment.