Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve intermediate file performance #271

Merged
merged 15 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies:
- conda-forge::plumbum
- conda-forge::tqdm
- conda-forge::typing_extensions
- conda-forge::zarr
- anaconda::ipython
- conda-forge::tomopy==1.15
- httomo::httomolib==2.0
Expand Down
24 changes: 24 additions & 0 deletions httomo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ def check(yaml_config: Path, in_data_file: Optional[Path] = None):
default=sys.stdout,
help="File to store the monitoring output. Defaults to '-', which denotes stdout"
)
@click.option(
"--intermediate-format",
type=click.Choice(["hdf5", "zarr"], case_sensitive=False),
default="hdf5",
help="Write intermediate data in hdf5 or zarr format",
)
@click.option(
"--chunk-intermediate",
is_flag=True,
help="Write intermediate data in chunked uncompressed format",
)
@click.option(
"--compress-intermediate",
is_flag=True,
help="Write intermediate data in chunked format with BLOSC compression applied",
)
@click.option(
"--syslog-host",
type=click.STRING,
Expand All @@ -132,10 +148,18 @@ def run(
max_memory: str,
monitor: List[str],
monitor_output: TextIO,
intermediate_format: str,
chunk_intermediate: bool,
compress_intermediate: bool,
syslog_host: str,
syslog_port: int,
):
"""Run a pipeline defined in YAML on input data."""
if compress_intermediate:
chunk_intermediate = True
httomo.globals.INTERMEDIATE_FORMAT = intermediate_format
httomo.globals.CHUNK_INTERMEDIATE = chunk_intermediate
httomo.globals.COMPRESS_INTERMEDIATE = compress_intermediate

comm = MPI.COMM_WORLD
does_contain_sweep = is_sweep_pipeline(yaml_config)
Expand Down
8 changes: 5 additions & 3 deletions httomo/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
run_out_dir: os.PathLike = Path(".")
gpu_id: int = -1
# maximum slices to use in CPU-only section
MAX_CPU_SLICES: int = (
64 # A some random number which will be overwritten by --max-cpu_slices flag during runtime
)
MAX_CPU_SLICES: int = 64 # A some random number which will be overwritten by --max-cpu_slices flag during runtime
INTERMEDIATE_FORMAT: str = "hdf5"
CHUNK_INTERMEDIATE: bool = False
COMPRESS_INTERMEDIATE: bool = False
SYSLOG_SERVER = "localhost"
SYSLOG_PORT = 514

21 changes: 15 additions & 6 deletions httomo/method_wrappers/save_intermediate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import pathlib
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union
import weakref
from mpi4py.MPI import Comm
import httomo
Expand All @@ -12,6 +12,7 @@
from httomo.utils import catchtime, xp

import h5py
import zarr
import numpy as np


Expand Down Expand Up @@ -44,9 +45,16 @@ def __init__(self,
if out_dir is None:
out_dir = httomo.globals.run_out_dir
assert out_dir is not None
self._file = h5py.File(f"{out_dir}/{filename}.h5", "w", driver="mpio", comm=comm)
# make sure file gets closed properly
weakref.finalize(self, self._file.close)
self._file: Union[h5py.File, zarr.DirectoryStore]
if httomo.globals.INTERMEDIATE_FORMAT == "hdf5":
self._file = h5py.File(
f"{out_dir}/{filename}.h5", "w", driver="mpio", comm=comm
)
# make sure hdf5 file gets closed properly
weakref.finalize(self, self._file.close)
else:
self._file = zarr.DirectoryStore(path=f"{out_dir}/{filename}.zarr")


def execute(self, block: T) -> T:
# we overwrite the whole execute method here, as we do not need any of the helper
Expand All @@ -66,14 +74,15 @@ def execute(self, block: T) -> T:
data,
global_shape=block.global_shape,
global_index=block.global_index,
slicing_dim=block.slicing_dim,
file=self._file,
path="/data",
path="data",
detector_x=self._loader.detector_x,
detector_y=self._loader.detector_y,
angles=block.angles,
)

if block.is_last_in_chunk:
if block.is_last_in_chunk and isinstance(self._file, h5py.File):
self._file.close()

return block
129 changes: 120 additions & 9 deletions httomo/methods.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import pathlib
from typing import Tuple
from typing import Tuple, Union
import numpy as np
import h5py
import hdf5plugin
import zarr
from zarr import storage

import httomo
from httomo.runner.dataset import DataSetBlock
from httomo.data import mpiutil
from httomo.utils import xp

__all__ = ["calculate_stats", "save_intermediate_data"]

# save a copy of the original guess_chunk if it needs to be restored
ORIGINAL_GUESS_CHUNK = h5py._hl.filters.guess_chunk

def calculate_stats(
data: np.ndarray,
Expand Down Expand Up @@ -36,21 +43,72 @@ def save_intermediate_data(
data: np.ndarray,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
file: h5py.File,
slicing_dim: int,
file: Union[h5py.File, zarr.DirectoryStore],
path: str,
detector_x: int,
detector_y: int,
angles: np.ndarray,
) -> None:
"""Saves intermediate data to a file, including auxiliary"""
# only create if not already present - otherwise return existing dataset
dataset = file.require_dataset(path, global_shape, data.dtype, exact=True)
if httomo.globals.CHUNK_INTERMEDIATE:
chunk_shape = [0, 0, 0]
chunk_shape[slicing_dim] = 1
DIMS = [0, 1, 2]
non_slicing_dims = list(set(DIMS) - set([slicing_dim]))
for dim in non_slicing_dims:
chunk_shape[dim] = global_shape[dim]
chunk_shape = tuple(chunk_shape)
else:
chunk_shape = None if isinstance(file, h5py.File) else False

dataset: Union[h5py.Dataset, zarr.Array]
if isinstance(file, h5py.File):
_save_auxiliary_data_hdf5(file, angles, detector_x, detector_y)

# monkey-patch guess_chunk in h5py for compression
# this is to avoid FILL_TIME_ALLOC
compression: Union[dict, hdf5plugin.Blosc]
if httomo.globals.COMPRESS_INTERMEDIATE:
compression = hdf5plugin.Blosc()
h5py._hl.filters.guess_chunk = lambda *args, **kwargs: None
else:
compression = {}
h5py._hl.filters.guess_chunk = ORIGINAL_GUESS_CHUNK

# create a dataset creation property list
dcpl = _dcpl_fill_never(chunk_shape, global_shape)

# only create if not already present - otherwise return existing dataset
dataset = file.require_dataset(
path,
global_shape,
data.dtype,
exact=True,
chunks=None, # set in dcpl
**compression,
dcpl=dcpl,
)
else:
_save_auxiliary_data_zarr(file, angles, detector_x, detector_y)
dataset = zarr.open_array(
store=file,
path=path,
shape=global_shape,
chunks=chunk_shape, # type: ignore
dtype=data.dtype,
compressor=(
storage.default_compressor
if httomo.globals.COMPRESS_INTERMEDIATE
else None
), # type: ignore
)

_save_dataset_data(dataset, data, global_shape, global_index)
_save_auxiliary_data(file, angles, detector_x, detector_y)


def _save_dataset_data(
dataset: h5py.Dataset,
dataset: Union[h5py.Dataset, zarr.Array],
data: np.ndarray,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
Expand All @@ -62,18 +120,71 @@ def _save_dataset_data(
assert stop[1] <= dataset.shape[1]
assert stop[2] <= dataset.shape[2]
assert dataset.shape == global_shape
if isinstance(dataset, h5py.Dataset) and httomo.globals.COMPRESS_INTERMEDIATE:
# Write operations must be collective when applying compression, see
# https://github.com/h5py/h5py/issues/1564
with dataset.collective:
dataset[start[0] : stop[0], start[1] : stop[1], start[2] : stop[2]] = data
return

dataset[start[0] : stop[0], start[1] : stop[1], start[2] : stop[2]] = data


def _save_auxiliary_data(
file: h5py.File, angles: np.ndarray, detector_x: int, detector_y: int
def _save_auxiliary_data_hdf5(
file: h5py.File,
angles: np.ndarray,
detector_x: int,
detector_y: int,
):
# only save if not there yet
if "/angles" in file:
return

file.create_dataset("/angles", data=angles)
file.create_dataset("angles", data=angles)

file_name = pathlib.Path(file.filename).name
file.create_dataset(file_name, data=[0, 0])
g1 = file.create_group("data_dims")
g1.create_dataset("detector_x_y", data=[detector_x, detector_y])


def _save_auxiliary_data_zarr(
store: zarr.DirectoryStore,
angles: np.ndarray,
detector_x: int,
detector_y: int,
):
if mpiutil.comm.rank != 0:
dkazanc marked this conversation as resolved.
Show resolved Hide resolved
return

zarr.save(store=store, path="angles", data=angles)
zarr.save(
store=store, path="detector_x_y", detector_x=detector_x, detector_y=detector_y
)


def _dcpl_fill_never(
chunk_shape: Tuple[int, int, int],
shape: Tuple[int, int, int],
) -> h5py.h5p.PropDCID:
"""Create a dcpl with specified chunk shape and never fill value."""
# validate chunk shape (basically a copy from h5py)
if isinstance(chunk_shape, int) and not isinstance(chunk_shape, bool):
chunk_shape = (chunk_shape,)
if isinstance(chunk_shape, tuple) and any(
chunk > dim for dim, chunk in zip(shape, chunk_shape)
if dim is not None):
errmsg = ("Chunk shape must not be greater than data shape in any "
f"dimension. {chunk_shape} is not compatible with {shape}."
)
raise ValueError(errmsg)

# dcpl initialisation
dcpl = h5py.h5p.create(h5py.h5p.DATASET_CREATE)

dcpl.set_chunk(chunk_shape)

# we are not going to resize the dataset
dcpl.set_fill_time(h5py.h5d.FILL_TIME_NEVER)

return dcpl
6 changes: 5 additions & 1 deletion tests/method_wrappers/test_save_intermediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
path: str,
detector_x: int,
Expand All @@ -39,10 +40,11 @@ def save_intermediate_data(
assert data.shape == dummy_block.shape
assert global_index == (0, 0, 0)
assert global_shape == dummy_block.shape
assert slicing_dim == 0
assert Path(file.filename).name == "task1-testpackage-testmethod-XXX.h5"
assert detector_x == 10
assert detector_y == 20
assert path == "/data"
assert path == "data"

mocker.patch("importlib.import_module", return_value=FakeModule)
prev_method = mocker.create_autospec(
Expand Down Expand Up @@ -79,6 +81,7 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
path: str,
detector_x: int,
Expand Down Expand Up @@ -125,6 +128,7 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
path: str,
detector_x: int,
Expand Down
4 changes: 4 additions & 0 deletions tests/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def test_save_intermediate_data(tmp_path: Path):
b2.data,
b2.global_shape,
b2.global_index,
b2.slicing_dim,
file,
path="/data",
detector_x=10,
Expand All @@ -106,6 +107,7 @@ def test_save_intermediate_data(tmp_path: Path):
b1.data,
b1.global_shape,
b1.global_index,
b1.slicing_dim,
file,
path="/data",
detector_x=10,
Expand Down Expand Up @@ -169,6 +171,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path):
b2.data,
b2.global_shape,
b2.global_index,
b2.slicing_dim,
file,
path="/data",
detector_x=10,
Expand All @@ -179,6 +182,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path):
b1.data,
b1.global_shape,
b1.global_index,
b1.slicing_dim,
file,
path="/data",
detector_x=10,
Expand Down
Loading