Skip to content

Commit

Permalink
Move store backing calculation out of dataset writer
Browse files Browse the repository at this point in the history
Prior to this change, the decision of whether the dataset store was
backed by RAM or an hdf5 file was made by the writer. Specifically, the
writer was deciding whether the store should be backed by RAM or an hdf5
file when the first block was written to the store.

This made a lot of sense when the determining of the store-backing was
done by:
- attempting to create a numpy array with the necessary chunk shape
  (thus allocating the required memory)
- potentially catching a `MemoryError` being raised by python

However, some changes were made in #218 to accommodate the fact that,
running under SLURM, the job would be OOM killed by SLURM without giving
python a chance to raise a `MemoryError`. Among these changes were that,
instead of allocating a numpy array and then catching if a `MemoryError`
was raised, the required number of bytes was calculated to determine if
the numpy array should be created or not. This essentially made the
determining of the store backing be an operation purely involving some
simple arithmetic.

The arithmetic could conceivably be done outside of the writer (ie,
prior to the first block being written to the store). One benefit of
this would be that the decision of the store backing wouldn't be so
hidden as to where it was happening.

Furthermore, the requirement of needing to account for two copies of the
chunk in memory (see the following comment and the linked thread in the
comment for more details
#401 (comment)),
was difficult to fulfil if the writer was the object deciding what the
backing of the store should be (ie, getting the chunk information about
the next section, to the writer of the current section, appeared to be
tricky).

With the above in mind, the calculation of if the store should be backed
by RAM or an hdf5 file has been moved out of the writer, and is now
performed by the task runner. The runner determines the backing of the
store, and passes that information to the writer's constructor. The
writer now simply uses whichever backing it is told to use by the
runner.
  • Loading branch information
yousefmoazzam committed Sep 12, 2024
1 parent 30114ff commit c7ed8b2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 175 deletions.
35 changes: 7 additions & 28 deletions httomo/data/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from httomo.data.padding import extrapolate_after, extrapolate_before
from httomo.runner.auxiliary_data import AuxiliaryData
from httomo.runner.dataset import DataSetBlock
from httomo.runner.dataset_store_backing import DataSetStoreBacking
from httomo.runner.dataset_store_interfaces import (
DataSetSource,
ReadableDataSetSink,
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(
slicing_dim: Literal[0, 1, 2],
comm: MPI.Comm,
temppath: PathLike,
memory_limit_bytes: int = 0,
store_backing: DataSetStoreBacking = DataSetStoreBacking.RAM,
):
self._slicing_dim = slicing_dim
self._comm = comm
Expand All @@ -66,7 +67,7 @@ def __init__(
self._readonly = False
self._h5file: Optional[h5py.File] = None
self._h5filename: Optional[Path] = None
self._memory_limit_bytes: int = memory_limit_bytes
self._store_backing = store_backing

self._data: Optional[Union[np.ndarray, h5py.Dataset]] = None

Expand All @@ -79,7 +80,7 @@ def __init__(

@property
def is_file_based(self) -> bool:
return self._h5filename is not None
return self._store_backing is DataSetStoreBacking.File

@property
def filename(self) -> Optional[Path]:
Expand Down Expand Up @@ -177,23 +178,12 @@ def _get_global_h5_filename(self) -> PathLike:
return self._h5filename

def _create_new_data(self, block: DataSetBlock):
# reduce memory errors across all processes - if any has a memory problem,
# all should use a file
sendBuffer = np.zeros(1, dtype=bool)
recvBuffer = np.zeros(1, dtype=bool)
try:
if self._store_backing is DataSetStoreBacking.RAM:
self._data = self._create_numpy_data(
unpadded_chunk_shape=block.chunk_shape_unpadded,
padded_chunk_shape=block.chunk_shape,
dtype=block.data.dtype,
)
except MemoryError:
sendBuffer[0] = True

# do a logical or of all the memory errors across the processes
self.comm.Allreduce([sendBuffer, MPI.BOOL], [recvBuffer, MPI.BOOL], MPI.LOR)

if bool(recvBuffer[0]) is True:
else:
log_once(
"Chunk does not fit in memory - using a file-based store",
level=logging.WARNING,
Expand All @@ -208,20 +198,9 @@ def _create_new_data(self, block: DataSetBlock):
)

def _create_numpy_data(
self,
unpadded_chunk_shape: Tuple[int, int, int],
padded_chunk_shape: Tuple[int, int, int],
dtype: DTypeLike,
self, unpadded_chunk_shape: Tuple[int, int, int], dtype: DTypeLike
) -> np.ndarray:
"""Convenience method to enable mocking easily"""
unpadded_chunk_bytes = np.prod(unpadded_chunk_shape) * np.dtype(dtype).itemsize
padded_chunk_bytes = np.prod(padded_chunk_shape) * np.dtype(dtype).itemsize
if (
self._memory_limit_bytes > 0
and unpadded_chunk_bytes + padded_chunk_bytes >= self._memory_limit_bytes
):
raise MemoryError("Memory limit reached")

return np.empty(unpadded_chunk_shape, dtype)

def _create_h5_data(
Expand Down
16 changes: 13 additions & 3 deletions httomo/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import httomo.globals
from httomo.data.dataset_store import DataSetStoreWriter
from httomo.runner.dataset_store_backing import determine_store_backing
from httomo.runner.method_wrapper import MethodWrapper
from httomo.runner.block_split import BlockSplitter
from httomo.runner.dataset import DataSetBlock
Expand Down Expand Up @@ -81,7 +82,7 @@ def _sectionize(self) -> List[Section]:
return sections

def _execute_section(self, section: Section, section_index: int = 0):
self._setup_source_sink(section)
self._setup_source_sink(section, section_index)
assert self.source is not None, "Dataset has not been loaded yet"
assert self.sink is not None, "Sink setup failed"

Expand Down Expand Up @@ -162,7 +163,7 @@ def _execute_section(self, section: Section, section_index: int = 0):
level=logging.INFO,
)

def _setup_source_sink(self, section: Section):
def _setup_source_sink(self, section: Section, idx: int):
assert self.source is not None, "Dataset has not been loaded yet"

slicing_dim_section: Literal[0, 1] = _get_slicing_dim(section.pattern) - 1 # type: ignore
Expand All @@ -173,6 +174,15 @@ def _setup_source_sink(self, section: Section):
assert isinstance(self.sink, ReadableDataSetSink)
self.source = self.sink.make_reader(slicing_dim_section)

store_backing = determine_store_backing(
comm=self.comm,
sections=self._sections,
memory_limit_bytes=self._memory_limit_bytes,
dtype=self.source.dtype,
global_shape=self.source.global_shape,
section_idx=idx,
)

if section.is_last:
# we don't need to store the results - this sink just discards it
self.sink = DummySink(slicing_dim_section)
Expand All @@ -181,7 +191,7 @@ def _setup_source_sink(self, section: Section):
slicing_dim_section,
self.comm,
self.reslice_dir,
memory_limit_bytes=self._memory_limit_bytes,
store_backing=store_backing,
)

def _execute_section_block(
Expand Down
Loading

0 comments on commit c7ed8b2

Please sign in to comment.