Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to specify number of frames per-chunk in intermediate hdf5 data #367

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions httomo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def check(yaml_config: Path, in_data_file: Optional[Path] = None):
default=514,
help="Port on the host the syslog server is running on",
)
@click.option(
"--frames-per-chunk",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ptim0626 @yousefmoazzam
I wonder if we need more granular control over the chunk sizes here? For instance we would like to use a smaller chunk than the whole frame, e.g. (1, 150, 150)? Something like a tuple to pass instead of the number of frames. The first value in that tuple would be linked to the slicing dimension and the other two of non-slicing dimensions?
Could that help with more versatile optimisation around the better write performance? And also the read speed in Dawn?
Currently the run fails on Wilson if frames-per-chunk > 1

Copy link
Collaborator

@ptim0626 ptim0626 Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering performance, instead of decreasing the chunk size, I think increasing the chunk size would actually help.
(1) with compression, applying the filter to a single bigger chunk is usually quicker than multiple smaller chunks because of the overhead of threading for some filters llike Blosc
(2) larger chunks mean fewer metadata is required (the size of B-tree for looking up where the chunks are is smaller)

Choosing an optimal chunk size depends how we access the data, also a balance between read and write performance. I would say having one frame per chunk as the smallest unit with the option of increasing it via --frames-per-chunk is a good balance of performance and usability (does the general users know the implication of using a customised chunk shape?)

The recommended 1 MiB chunk size is about the default raw data chunk cache (rdcc) in hdf5, which is default to be 1 MiB. As long as we ensure we set the size of rdcc to be at least one chunk, it doesn't matter much for the chunk size, if it fits our access pattern. Even so, I don't think it impacts performance a lot, as most of the time (I guess) we just write/read the data once (so caching is not used).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ptim0626 , it makes sense. The general user won't be playing with --frames-per-chunk I reckon so it would be nice to fix a default size that fits our needs best.

type=click.IntRange(0),
default=1,
help="Number of frames per-chunk in intermediate data (0 = write as contiguous)",
)
def run(
in_data_file: Path,
yaml_config: Path,
Expand All @@ -134,8 +140,10 @@ def run(
monitor_output: TextIO,
syslog_host: str,
syslog_port: int,
frames_per_chunk: int,
):
"""Run a pipeline defined in YAML on input data."""
httomo.globals.FRAMES_PER_CHUNK = frames_per_chunk

comm = MPI.COMM_WORLD
does_contain_sweep = is_sweep_pipeline(yaml_config)
Expand Down
1 change: 1 addition & 0 deletions httomo/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
)
SYSLOG_SERVER = "localhost"
SYSLOG_PORT = 514
FRAMES_PER_CHUNK: int = 1
2 changes: 2 additions & 0 deletions httomo/method_wrappers/save_intermediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def execute(self, block: T) -> T:
data,
global_shape=block.global_shape,
global_index=block.global_index,
slicing_dim=block.slicing_dim,
file=self._file,
frames_per_chunk=httomo.globals.FRAMES_PER_CHUNK,
path="/data",
detector_x=self._loader.detector_x,
detector_y=self._loader.detector_y,
Expand Down
34 changes: 32 additions & 2 deletions httomo/methods.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import pathlib
from typing import Tuple
import numpy as np
import h5py

import httomo
from httomo.runner.dataset import DataSetBlock
from httomo.utils import xp
from httomo.utils import log_once, xp

__all__ = ["calculate_stats", "save_intermediate_data"]

Expand Down Expand Up @@ -36,15 +38,43 @@ def save_intermediate_data(
data: np.ndarray,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
frames_per_chunk: int,
path: str,
detector_x: int,
detector_y: int,
angles: np.ndarray,
) -> None:
"""Saves intermediate data to a file, including auxiliary"""
if frames_per_chunk > data.shape[slicing_dim]:
warn_message = (
f"frames_per_chunk={frames_per_chunk} exceeds number of elements in "
f"slicing dim={slicing_dim} of data with shape {data.shape}. Falling "
"back to 1 frame per-chunk"
)
log_once(warn_message, logging.DEBUG)
frames_per_chunk = 1

if frames_per_chunk > 0:
chunk_shape = [0, 0, 0]
chunk_shape[slicing_dim] = frames_per_chunk
DIMS = [0, 1, 2]
non_slicing_dims = list(set(DIMS) - set([slicing_dim]))
for dim in non_slicing_dims:
chunk_shape[dim] = global_shape[dim]
chunk_shape = tuple(chunk_shape)
else:
chunk_shape = None

# only create if not already present - otherwise return existing dataset
dataset = file.require_dataset(path, global_shape, data.dtype, exact=True)
dataset = file.require_dataset(
path,
global_shape,
data.dtype,
exact=True,
chunks=chunk_shape,
)
_save_dataset_data(dataset, data, global_shape, global_index)
_save_auxiliary_data(file, angles, detector_x, detector_y)

Expand Down
16 changes: 14 additions & 2 deletions tests/method_wrappers/test_save_intermediate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from typing import Tuple
from unittest import mock
import pytest
from pytest_mock import MockerFixture
from httomo.method_wrappers import make_method_wrapper
Expand All @@ -20,6 +21,7 @@
def test_save_intermediate(
mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: Path
):
FRAMES_PER_CHUNK = 0
loader: LoaderInterface = mocker.create_autospec(
LoaderInterface, instance=True, detector_x=10, detector_y=20
)
Expand All @@ -29,6 +31,8 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
frames_per_chunk: int,
file: h5py.File,
path: str,
detector_x: int,
Expand All @@ -39,6 +43,8 @@ def save_intermediate_data(
assert data.shape == dummy_block.shape
assert global_index == (0, 0, 0)
assert global_shape == dummy_block.shape
assert slicing_dim == 0
assert frames_per_chunk == FRAMES_PER_CHUNK
assert Path(file.filename).name == "task1-testpackage-testmethod-XXX.h5"
assert detector_x == 10
assert detector_y == 20
Expand All @@ -63,7 +69,8 @@ def save_intermediate_data(
prev_method=prev_method,
)
assert isinstance(wrp, SaveIntermediateFilesWrapper)
res = wrp.execute(dummy_block)
with mock.patch("httomo.globals.FRAMES_PER_CHUNK", FRAMES_PER_CHUNK):
res = wrp.execute(dummy_block)

assert res == dummy_block

Expand All @@ -79,6 +86,7 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
path: str,
detector_x: int,
Expand Down Expand Up @@ -116,6 +124,7 @@ def test_save_intermediate_leaves_gpu_data(
if gpu and not gpu_enabled:
pytest.skip("No GPU available")

FRAMES_PER_CHUNK = 0
loader: LoaderInterface = mocker.create_autospec(
LoaderInterface, instance=True, detector_x=10, detector_y=20
)
Expand All @@ -125,7 +134,9 @@ def save_intermediate_data(
data,
global_shape: Tuple[int, int, int],
global_index: Tuple[int, int, int],
slicing_dim: int,
file: h5py.File,
frames_per_chunk: int,
path: str,
detector_x: int,
detector_y: int,
Expand Down Expand Up @@ -157,6 +168,7 @@ def save_intermediate_data(
dummy_block.to_gpu()

assert dummy_block.is_gpu == gpu
res = wrp.execute(dummy_block)
with mock.patch("httomo.globals.FRAMES_PER_CHUNK", FRAMES_PER_CHUNK):
res = wrp.execute(dummy_block)

assert res.is_gpu == gpu
133 changes: 133 additions & 0 deletions tests/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def test_save_intermediate_data(tmp_path: Path):
b2.data,
b2.global_shape,
b2.global_index,
b2.slicing_dim,
file,
frames_per_chunk=0,
path="/data",
detector_x=10,
detector_y=20,
Expand All @@ -106,7 +108,9 @@ def test_save_intermediate_data(tmp_path: Path):
b1.data,
b1.global_shape,
b1.global_index,
b1.slicing_dim,
file,
frames_per_chunk=0,
path="/data",
detector_x=10,
detector_y=20,
Expand Down Expand Up @@ -169,7 +173,9 @@ def test_save_intermediate_data_mpi(tmp_path: Path):
b2.data,
b2.global_shape,
b2.global_index,
b2.slicing_dim,
file,
frames_per_chunk=0,
path="/data",
detector_x=10,
detector_y=20,
Expand All @@ -179,7 +185,9 @@ def test_save_intermediate_data_mpi(tmp_path: Path):
b1.data,
b1.global_shape,
b1.global_index,
b1.slicing_dim,
file,
frames_per_chunk=0,
path="/data",
detector_x=10,
detector_y=20,
Expand All @@ -198,3 +206,128 @@ def test_save_intermediate_data_mpi(tmp_path: Path):
np.testing.assert_array_equal(file["test_file.h5"], [0, 0])
assert "data_dims" in file
np.testing.assert_array_equal(file["data_dims"]["detector_x_y"], [10, 20])


@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5, 1000])
def test_save_intermediate_data_frames_per_chunk(
tmp_path: Path,
frames_per_chunk: int,
):
FILE_NAME = "test_file.h5"
DATA_PATH = "/data"
GLOBAL_SHAPE = (10, 10, 10)
global_data = np.arange(np.prod(GLOBAL_SHAPE), dtype=np.float32).reshape(
GLOBAL_SHAPE
)
aux_data = AuxiliaryData(angles=np.ones(GLOBAL_SHAPE[0], dtype=np.float32))
block = DataSetBlock(
data=global_data,
aux_data=aux_data,
slicing_dim=0,
block_start=0,
chunk_start=0,
chunk_shape=GLOBAL_SHAPE,
global_shape=GLOBAL_SHAPE,
)

with h5py.File(tmp_path / FILE_NAME, "w") as f:
save_intermediate_data(
data=block.data,
global_shape=block.global_shape,
global_index=block.global_index,
slicing_dim=block.slicing_dim,
file=f,
frames_per_chunk=frames_per_chunk,
path=DATA_PATH,
detector_x=block.global_shape[2],
detector_y=block.global_shape[1],
angles=block.angles,
)

# Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing
# dim of the data that was saved
expected_chunk_shape = [0, 0, 0]
expected_chunk_shape[block.slicing_dim] = (
frames_per_chunk if frames_per_chunk != 1000 else 1
)
DIMS = [0, 1, 2]
non_slicing_dims = list(set(DIMS) - set([block.slicing_dim]))
for dim in non_slicing_dims:
expected_chunk_shape[dim] = block.global_shape[dim]

with h5py.File(tmp_path / FILE_NAME, "r") as f:
chunk_shape = f[DATA_PATH].chunks

if frames_per_chunk != 0:
assert chunk_shape == tuple(expected_chunk_shape)
else:
assert chunk_shape is None


@pytest.mark.mpi
@pytest.mark.skipif(
MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test"
)
@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5, 1000])
def test_save_intermediate_data_frames_per_chunk_mpi(
tmp_path: Path,
frames_per_chunk: int,
):
COMM = MPI.COMM_WORLD
tmp_path = COMM.bcast(tmp_path)
FILE_NAME = "test_file.h5"
DATA_PATH = "/data"
SLICING_DIM = 0
GLOBAL_SHAPE = (10, 10, 10)
CHUNK_SIZE = GLOBAL_SHAPE[SLICING_DIM] // 2
global_data = np.arange(np.prod(GLOBAL_SHAPE), dtype=np.float32).reshape(
GLOBAL_SHAPE
)
aux_data = AuxiliaryData(angles=np.ones(GLOBAL_SHAPE[0], dtype=np.float32))
rank_data = (
global_data[:CHUNK_SIZE, :, :]
if COMM.rank == 0
else global_data[CHUNK_SIZE:, :, :]
)
block = DataSetBlock(
data=rank_data,
aux_data=aux_data,
slicing_dim=0,
block_start=0,
chunk_start=0 if COMM.rank == 0 else CHUNK_SIZE,
global_shape=GLOBAL_SHAPE,
chunk_shape=(CHUNK_SIZE, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]),
)

with h5py.File(tmp_path / FILE_NAME, "w", driver="mpio", comm=COMM) as f:
save_intermediate_data(
data=block.data,
global_shape=block.global_shape,
global_index=block.global_index,
slicing_dim=block.slicing_dim,
file=f,
frames_per_chunk=frames_per_chunk,
path=DATA_PATH,
detector_x=block.global_shape[2],
detector_y=block.global_shape[1],
angles=block.angles,
)

# Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing
# dim of the data that was saved
expected_chunk_shape = [0, 0, 0]
expected_chunk_shape[block.slicing_dim] = (
frames_per_chunk if frames_per_chunk != 1000 else 1
)
DIMS = [0, 1, 2]
non_slicing_dims = list(set(DIMS) - set([block.slicing_dim]))
for dim in non_slicing_dims:
expected_chunk_shape[dim] = block.global_shape[dim]

with h5py.File(tmp_path / FILE_NAME, "r") as f:
chunk_shape = f[DATA_PATH].chunks

if frames_per_chunk != 0:
assert chunk_shape == tuple(expected_chunk_shape)
else:
assert chunk_shape is None
Loading