Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SharedMemory and ndarray serialization #1

Merged
merged 18 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name: appose-dev
channels:
- conda-forge
- defaults
- forklift
dependencies:
- python >= 3.10
# Developer tools
Expand Down
1 change: 1 addition & 0 deletions src/appose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def task_listener(event):
from pathlib import Path

from .environment import Builder, Environment
from .types import NDArray, SharedMemory # noqa: F401


def base(directory: Path) -> Builder:
Expand Down
16 changes: 11 additions & 5 deletions src/appose/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
###

"""
TODO
The Appose worker for running Python scripts.

Like all Appose workers, this program conforms to the Appose worker process
contract, meaning it accepts requests on stdin and produces responses on
stdout, both formatted according to Appose's assumptions.

For details, see the Appose README:
https://github.com/apposed/appose/blob/-/README.md#workers
"""

import ast
Expand All @@ -39,7 +46,7 @@

# NB: Avoid relative imports so that this script can be run standalone.
from appose.service import RequestType, ResponseType
from appose.types import Args, decode, encode
from appose.types import Args, _set_worker, decode, encode


class Task:
Expand Down Expand Up @@ -80,7 +87,6 @@ def _start(self, script: str, inputs: Optional[Args]) -> None:
def execute_script():
# Populate script bindings.
binding = {"task": self}
# TODO: Magically convert shared memory image inputs.
if inputs is not None:
binding.update(inputs)

Expand Down Expand Up @@ -156,6 +162,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None:


def main() -> None:
_set_worker(True)

tasks = {}

while True:
Expand All @@ -181,8 +189,6 @@ def main() -> None:
case RequestType.CANCEL:
task = tasks.get(uuid)
if task is None:
# TODO: proper logging
# Maybe should stdout the error back to Appose calling process.
print(f"No such task: {uuid}", file=sys.stderr)
continue
task.cancel_requested = True
Expand Down
162 changes: 159 additions & 3 deletions src/appose/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,170 @@
###

import json
from typing import Any, Dict
import re
from math import ceil, prod
from multiprocessing import resource_tracker, shared_memory
from typing import Any, Dict, Sequence, Union

Args = Dict[str, Any]


class SharedMemory(shared_memory.SharedMemory):
"""
An enhanced version of Python's multiprocessing.shared_memory.SharedMemory
class which can be used with a `with` statement. When the program flow
exits the `with` block, this class's `dispose()` method will be invoked,
which might call `close()` or `unlink()` depending on the value of its
`unlink_on_dispose` flag.
"""

def __init__(self, name: str = None, create: bool = False, size: int = 0):
super().__init__(name=name, create=create, size=size)
self._unlink_on_dispose = create
if _is_worker:
# HACK: Remove this shared memory block from the resource_tracker,
# which wants to clean up shared memory blocks after all known
# references are done using them.
#
# There is one resource_tracker per Python process, and they will
# each try to delete shared memory blocks known to them when they
# are shutting down, even when other processes still need them.
#
# As such, the rule Appose follows is: let the service process
# always handle cleanup of shared memory blocks, regardless of
# which process initially allocated it.
resource_tracker.unregister(self._name, "shared_memory")

def unlink_on_dispose(self, value: bool) -> None:
"""
Set whether the `unlink()` method should be invoked to destroy
the shared memory block when the `dispose()` method is called.

Note: dispose() is the method called when exiting a `with` block.

By default, shared memory objects constructed with `create=True`
will behave this way, whereas shared memory objects constructed
with `create=False` will not. But this method allows to override
the behavior.
"""
self._unlink_on_dispose = value

def dispose(self) -> None:
if self._unlink_on_dispose:
self.unlink()
else:
self.close()

def __enter__(self) -> "SharedMemory":
return self

def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.dispose()


def encode(data: Args) -> str:
return json.dumps(data)
return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":"))


def decode(the_json: str) -> Args:
return json.loads(the_json)
return json.loads(the_json, object_hook=_appose_object_hook)


class NDArray:
"""
Data structure for a multi-dimensional array.
The array contains elements of a data type, arranged in
a particular shape, and flattened into SharedMemory.
"""

def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None):
"""
Create an NDArray.
:param dtype: The type of the data elements; e.g. int8, uint8, float32, float64.
:param shape: The dimensional extents; e.g. a stack of 7 image planes
with resolution 512x512 would have shape [7, 512, 512].
:param shm: The SharedMemory containing the array data, or None to create it.
"""
self.dtype = dtype
self.shape = shape
self.shm = (
SharedMemory(
create=True, size=ceil(prod(shape) * _bytes_per_element(dtype))
)
if shm is None
else shm
)

def __str__(self):
return (
f"NDArray("
f"dtype='{self.dtype}', "
f"shape={self.shape}, "
f"shm='{self.shm.name}' ({self.shm.size}))"
)

def ndarray(self):
"""
Create a NumPy ndarray object for working with the array data.
No array data is copied; the NumPy array wraps the same SharedMemory.
Requires the numpy package to be installed.
"""
try:
import numpy

return numpy.ndarray(
prod(self.shape), dtype=self.dtype, buffer=self.shm.buf
).reshape(self.shape)
except ModuleNotFoundError:
raise ImportError("NumPy is not available.")

def __enter__(self) -> "NDArray":
return self

def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.shm.dispose()


class _ApposeJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, SharedMemory):
return {
"appose_type": "shm",
"name": obj.name,
"size": obj.size,
}
if isinstance(obj, NDArray):
return {
"appose_type": "ndarray",
"dtype": obj.dtype,
"shape": obj.shape,
"shm": obj.shm,
}
return super().default(obj)


def _appose_object_hook(obj: Dict):
atype = obj.get("appose_type")
if atype == "shm":
# Attach to existing shared memory block.
return SharedMemory(name=(obj["name"]), size=(obj["size"]))
elif atype == "ndarray":
return NDArray(obj["dtype"], obj["shape"], obj["shm"])
else:
return obj


def _bytes_per_element(dtype: str) -> Union[int, float]:
try:
bits = int(re.sub("[^0-9]", "", dtype))
except ValueError:
raise ValueError(f"Invalid dtype: {dtype}")
return bits / 8


_is_worker = False


def _set_worker(value: bool) -> None:
global _is_worker
_is_worker = value
60 changes: 60 additions & 0 deletions tests/test_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
###
# #%L
# Appose: multi-language interprocess cooperation with shared memory.
# %%
# Copyright (C) 2023 Appose developers.
# %%
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# #L%
###

import appose
from appose.service import TaskStatus

ndarray_inspect = """
task.outputs["size"] = data.shm.size
task.outputs["dtype"] = data.dtype
task.outputs["shape"] = data.shape
task.outputs["sum"] = sum(v for v in data.shm.buf)
"""


def test_ndarray():
env = appose.system()
with env.python() as service:
with appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) as shm:
# Construct the data.
shm.buf[0] = 123
shm.buf[456] = 78
shm.buf[1999] = 210
data = appose.NDArray("uint16", [2, 20, 25], shm)

# Run the task.
task = service.task(ndarray_inspect, {"data": data})
task.wait_for()

# Validate the execution result.
assert TaskStatus.COMPLETE == task.status
assert 2 * 20 * 25 * 2 == task.outputs["size"]
assert "uint16" == task.outputs["dtype"]
assert [2, 20, 25] == task.outputs["shape"]
assert 123 + 78 + 210 == task.outputs["sum"]
Loading
Loading