|
30 | 30 | import json
|
31 | 31 | import re
|
32 | 32 | from math import ceil, prod
|
| 33 | +from multiprocessing import resource_tracker |
33 | 34 | from multiprocessing.shared_memory import SharedMemory
|
34 | 35 | from typing import Any, Dict, Sequence, Union
|
35 | 36 |
|
@@ -110,11 +111,35 @@ def default(self, obj):
|
110 | 111 | }
|
111 | 112 | return super().default(obj)
|
112 | 113 |
|
113 |
| - |
| 114 | +foo = True |
114 | 115 | def _appose_object_hook(obj: Dict):
|
115 | 116 | atype = obj.get("appose_type")
|
116 | 117 | if atype == "shm":
|
117 |
| - return SharedMemory(name=(obj["name"]), size=(obj["size"])) |
| 118 | + # Attach to existing shared memory block. |
| 119 | + shm = SharedMemory(name=(obj["name"]), size=(obj["size"])) |
| 120 | + |
| 121 | + # HACK: Work around the Python resource trackers's vigorous effort to |
| 122 | + # garbage collect all shared memory blocks after all known references |
| 123 | + # are done using them. Unfortunately, due to how Appose invokes Python |
| 124 | + # worker processes, the resource tracker does not know about the |
| 125 | + # reference from the service process, and overeagerly eats the memory |
| 126 | + # when the worker shuts down. To avoid this issue, we tell the worker |
| 127 | + # process's associated resource tracker to ignore this particular |
| 128 | + # shared memory block, instead trusting the process that actually |
| 129 | + # created it to clean up when finished. |
| 130 | + # |
| 131 | + # This logic could go wrong if the worker process creates a |
| 132 | + # SharedMemory, returns it to the service process as an output, and |
| 133 | + # then the service process subsequently passes it back to the worker as |
| 134 | + # an input argument: such a sequence of events would lead to the named |
| 135 | + # shared memory in question being unregistered with the resource |
| 136 | + # tracker here, even though it in fact *was* this worker process that |
| 137 | + # created the shared memory block earlier... but I don't have a clear |
| 138 | + # idea for how to avoid this difficulty at the moment. |
| 139 | + name = "/" + shm.name |
| 140 | + resource_tracker.unregister(name, "shared_memory") |
| 141 | + |
| 142 | + return shm |
118 | 143 | elif atype == "ndarray":
|
119 | 144 | return NDArray(obj["dtype"], obj["shape"], obj["shm"])
|
120 | 145 | else:
|
|
0 commit comments