Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move store backing calculation out of dataset writer #441

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 7 additions & 28 deletions httomo/data/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from httomo.data.padding import extrapolate_after, extrapolate_before
from httomo.runner.auxiliary_data import AuxiliaryData
from httomo.runner.dataset import DataSetBlock
from httomo.runner.dataset_store_backing import DataSetStoreBacking
from httomo.runner.dataset_store_interfaces import (
DataSetSource,
ReadableDataSetSink,
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(
slicing_dim: Literal[0, 1, 2],
comm: MPI.Comm,
temppath: PathLike,
memory_limit_bytes: int = 0,
store_backing: DataSetStoreBacking = DataSetStoreBacking.RAM,
):
self._slicing_dim = slicing_dim
self._comm = comm
Expand All @@ -66,7 +67,7 @@ def __init__(
self._readonly = False
self._h5file: Optional[h5py.File] = None
self._h5filename: Optional[Path] = None
self._memory_limit_bytes: int = memory_limit_bytes
self._store_backing = store_backing

self._data: Optional[Union[np.ndarray, h5py.Dataset]] = None

Expand All @@ -79,7 +80,7 @@ def __init__(

@property
def is_file_based(self) -> bool:
return self._h5filename is not None
return self._store_backing is DataSetStoreBacking.File

@property
def filename(self) -> Optional[Path]:
Expand Down Expand Up @@ -177,23 +178,12 @@ def _get_global_h5_filename(self) -> PathLike:
return self._h5filename

def _create_new_data(self, block: DataSetBlock):
# reduce memory errors across all processes - if any has a memory problem,
# all should use a file
sendBuffer = np.zeros(1, dtype=bool)
recvBuffer = np.zeros(1, dtype=bool)
try:
if self._store_backing is DataSetStoreBacking.RAM:
self._data = self._create_numpy_data(
unpadded_chunk_shape=block.chunk_shape_unpadded,
padded_chunk_shape=block.chunk_shape,
dtype=block.data.dtype,
)
except MemoryError:
sendBuffer[0] = True

# do a logical or of all the memory errors across the processes
self.comm.Allreduce([sendBuffer, MPI.BOOL], [recvBuffer, MPI.BOOL], MPI.LOR)

if bool(recvBuffer[0]) is True:
else:
log_once(
"Chunk does not fit in memory - using a file-based store",
level=logging.WARNING,
Expand All @@ -208,20 +198,9 @@ def _create_new_data(self, block: DataSetBlock):
)

def _create_numpy_data(
self,
unpadded_chunk_shape: Tuple[int, int, int],
padded_chunk_shape: Tuple[int, int, int],
dtype: DTypeLike,
self, unpadded_chunk_shape: Tuple[int, int, int], dtype: DTypeLike
) -> np.ndarray:
"""Convenience method to enable mocking easily"""
unpadded_chunk_bytes = np.prod(unpadded_chunk_shape) * np.dtype(dtype).itemsize
padded_chunk_bytes = np.prod(padded_chunk_shape) * np.dtype(dtype).itemsize
if (
self._memory_limit_bytes > 0
and unpadded_chunk_bytes + padded_chunk_bytes >= self._memory_limit_bytes
):
raise MemoryError("Memory limit reached")

return np.empty(unpadded_chunk_shape, dtype)

def _create_h5_data(
Expand Down
178 changes: 178 additions & 0 deletions httomo/runner/dataset_store_backing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from enum import Enum
from typing import Callable, List, ParamSpec, Tuple

import numpy as np
from numpy.typing import DTypeLike
from mpi4py import MPI

from httomo.runner.section import Section, determine_section_padding
from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape


def calculate_section_chunk_shape(
comm: MPI.Comm,
global_shape: Tuple[int, int, int],
slicing_dim: int,
padding: Tuple[int, int],
) -> Tuple[int, int, int]:
"""
Calculate chunk shape (w/ or w/o padding) for a section.
"""
start = round((global_shape[slicing_dim] / comm.size) * comm.rank)
stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1))
section_slicing_dim_len = stop - start
shape = list(global_shape)
shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1]
return make_3d_shape_from_shape(shape)


def calculate_section_chunk_bytes(
chunk_shape: Tuple[int, int, int],
dtype: DTypeLike,
section: Section,
) -> int:
"""
Calculate the number of bytes in the section output chunk that is written to the store. Ths
accounts for data's non-slicing dims changing during processing, which changes the chunk
shape for the section and thus affects the number of bytes in the chunk.
"""
slicing_dim = _get_slicing_dim(section.pattern) - 1
non_slice_dims_list = list(chunk_shape)
non_slice_dims_list.pop(slicing_dim)
non_slice_dims = (non_slice_dims_list[0], non_slice_dims_list[1])

for method in section.methods:
if method.memory_gpu is None:
continue
non_slice_dims = method.calculate_output_dims(non_slice_dims)

return int(
np.prod(non_slice_dims) * chunk_shape[slicing_dim] * np.dtype(dtype).itemsize
)


class DataSetStoreBacking(Enum):
RAM = 1
File = 2


P = ParamSpec("P")


def _reduce_decorator_factory(
comm: MPI.Comm,
) -> Callable[[Callable[P, DataSetStoreBacking]], Callable[P, DataSetStoreBacking]]:
"""
Generate decorator for store-backing calculator function that will use the given MPI
communicator for the reduce operation.
"""

def reduce_decorator(
func: Callable[P, DataSetStoreBacking]
) -> Callable[P, DataSetStoreBacking]:
"""
Decorator for store-backing calculator function.
"""

def wrapper(*args: P.args, **kwargs: P.kwargs) -> DataSetStoreBacking:
"""
Perform store-backing calculation across all MPI processes and reduce.
"""
# reduce store backing enum variant across all processes - if any has
# `File` variant, all should use a file
send_buffer = np.zeros(1, dtype=bool)
recv_buffer = np.zeros(1, dtype=bool)
store_backing = func(*args, **kwargs)

if store_backing is DataSetStoreBacking.File:
send_buffer[0] = True

# do a logical or of all the enum variants across the processes
comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR)

if bool(recv_buffer[0]) is True:
return DataSetStoreBacking.File

return DataSetStoreBacking.RAM

return wrapper

return reduce_decorator


def _non_last_section_in_pipeline(
memory_limit_bytes: int,
write_chunk_bytes: int,
read_chunk_bytes: int,
) -> DataSetStoreBacking:
"""
Calculate backing of dataset store for non-last sections in pipeline
"""
if (
memory_limit_bytes > 0
and write_chunk_bytes + read_chunk_bytes >= memory_limit_bytes
):
return DataSetStoreBacking.File

return DataSetStoreBacking.RAM


def _last_section_in_pipeline(
memory_limit_bytes: int,
write_chunk_bytes: int,
) -> DataSetStoreBacking:
"""
Calculate backing of dataset store for last section in pipeline
"""
if memory_limit_bytes > 0 and write_chunk_bytes >= memory_limit_bytes:
return DataSetStoreBacking.File

return DataSetStoreBacking.RAM


def determine_store_backing(
comm: MPI.Comm,
sections: List[Section],
memory_limit_bytes: int,
dtype: DTypeLike,
global_shape: Tuple[int, int, int],
section_idx: int,
) -> DataSetStoreBacking:
reduce_decorator = _reduce_decorator_factory(comm)

# Get chunk shape input to section
current_chunk_shape = calculate_section_chunk_shape(
comm=comm,
global_shape=global_shape,
slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1,
padding=(0, 0),
)

# Get the number of bytes in the input chunk to the section w/ potential modifications to
# the non-slicing dims
current_chunk_bytes = calculate_section_chunk_bytes(
chunk_shape=current_chunk_shape,
dtype=dtype,
section=sections[section_idx],
)

if section_idx == len(sections) - 1:
return reduce_decorator(_last_section_in_pipeline)(
memory_limit_bytes=memory_limit_bytes,
write_chunk_bytes=current_chunk_bytes,
)

# Get chunk shape created by reader of section `n+1`, that will add padding to the
# chunk shape written by the writer of section `n`
next_chunk_shape = calculate_section_chunk_shape(
comm=comm,
global_shape=global_shape,
slicing_dim=_get_slicing_dim(sections[section_idx + 1].pattern) - 1,
padding=determine_section_padding(sections[section_idx + 1]),
)
next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize)
return reduce_decorator(_non_last_section_in_pipeline)(
memory_limit_bytes=memory_limit_bytes,
write_chunk_bytes=current_chunk_bytes,
read_chunk_bytes=next_chunk_bytes,
)
11 changes: 10 additions & 1 deletion httomo/runner/section.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Iterator, List, Optional
from typing import Iterator, List, Optional, Tuple

from httomo.runner.output_ref import OutputRef
from httomo.runner.pipeline import Pipeline
Expand Down Expand Up @@ -142,3 +142,12 @@ def _set_method_patterns(sections: List[Section]):
for s in sections:
for m in s:
m.pattern = s.pattern


def determine_section_padding(section: Section) -> Tuple[int, int]:
# NOTE: Assumes that only one method with padding will be in a section, which is
# consistent with the assumptions made by `sectionize()`
for method in section.methods:
if method.padding:
return method.calculate_padding()
return (0, 0)
49 changes: 16 additions & 33 deletions httomo/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import httomo.globals
from httomo.data.dataset_store import DataSetStoreWriter
from httomo.runner.dataset_store_backing import determine_store_backing
from httomo.runner.method_wrapper import MethodWrapper
from httomo.runner.block_split import BlockSplitter
from httomo.runner.dataset import DataSetBlock
Expand All @@ -29,7 +30,6 @@
log_exception,
log_once,
log_rank,
make_3d_shape_from_shape,
)
import numpy as np

Expand All @@ -56,12 +56,13 @@ def __init__(

self._memory_limit_bytes = memory_limit_bytes

self._sections = self._sectionize()

def execute(self) -> None:
with catchtime() as t:
sections = self._sectionize()

self._prepare()
for i, section in enumerate(sections):
for i, section in enumerate(self._sections):
self._execute_section(section, i)
gpumem_cleanup()

Expand All @@ -81,7 +82,7 @@ def _sectionize(self) -> List[Section]:
return sections

def _execute_section(self, section: Section, section_index: int = 0):
self._setup_source_sink(section)
self._setup_source_sink(section, section_index)
assert self.source is not None, "Dataset has not been loaded yet"
assert self.sink is not None, "Sink setup failed"

Expand Down Expand Up @@ -162,7 +163,7 @@ def _execute_section(self, section: Section, section_index: int = 0):
level=logging.INFO,
)

def _setup_source_sink(self, section: Section):
def _setup_source_sink(self, section: Section, idx: int):
assert self.source is not None, "Dataset has not been loaded yet"

slicing_dim_section: Literal[0, 1] = _get_slicing_dim(section.pattern) - 1 # type: ignore
Expand All @@ -173,6 +174,15 @@ def _setup_source_sink(self, section: Section):
assert isinstance(self.sink, ReadableDataSetSink)
self.source = self.sink.make_reader(slicing_dim_section)

store_backing = determine_store_backing(
comm=self.comm,
sections=self._sections,
memory_limit_bytes=self._memory_limit_bytes,
dtype=self.source.dtype,
global_shape=self.source.global_shape,
section_idx=idx,
)

if section.is_last:
# we don't need to store the results - this sink just discards it
self.sink = DummySink(slicing_dim_section)
Expand All @@ -181,7 +191,7 @@ def _setup_source_sink(self, section: Section):
slicing_dim_section,
self.comm,
self.reslice_dir,
memory_limit_bytes=self._memory_limit_bytes,
store_backing=store_backing,
)

def _execute_section_block(
Expand Down Expand Up @@ -352,30 +362,3 @@ def determine_max_slices(self, section: Section, slicing_dim: int):
non_slice_dims_shape = output_dims

section.max_slices = min(max_slices_methods)

def determine_section_padding(self, section: Section) -> Tuple[int, int]:
# NOTE: Assumes that only one method with padding will be in a section, which is
# consistent with the assumptions made by `section.sectionizer()`
for method in section.methods:
if method.padding:
return method.calculate_padding()
return (0, 0)


def calculate_next_chunk_shape(
comm: MPI.Comm,
global_shape: Tuple[int, int, int],
next_section_slicing_dim: int,
next_section_padding: Tuple[int, int],
) -> Tuple[int, int, int]:
"""
Utility function for calculating the chunk shape (including padding) for the next section.
"""
start = round((global_shape[next_section_slicing_dim] / comm.size) * comm.rank)
stop = round((global_shape[next_section_slicing_dim] / comm.size) * (comm.rank + 1))
next_section_slicing_dim_len = stop - start
shape = list(global_shape)
shape[next_section_slicing_dim] = (
next_section_slicing_dim_len + next_section_padding[0] + next_section_padding[1]
)
return make_3d_shape_from_shape(shape)
Loading
Loading