From d9b689d2a8b57290544cacc88149058e0337d5a1 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Wed, 3 Apr 2024 16:00:10 +0100 Subject: [PATCH 1/5] Add flag to write hdf5 intermediate data chunked --- httomo/cli.py | 7 +++++++ httomo/globals.py | 1 + httomo/method_wrappers/save_intermediate.py | 1 + httomo/methods.py | 21 ++++++++++++++++++- .../method_wrappers/test_save_intermediate.py | 4 ++++ tests/test_methods.py | 4 ++++ 6 files changed, 37 insertions(+), 1 deletion(-) diff --git a/httomo/cli.py b/httomo/cli.py index 2c4765ed9..bd2cc86fb 100644 --- a/httomo/cli.py +++ b/httomo/cli.py @@ -120,6 +120,11 @@ def check(yaml_config: Path, in_data_file: Optional[Path] = None): default=514, help="Port on the host the syslog server is running on", ) +@click.option( + "--chunk-intermediate", + is_flag=True, + help="Write intermediate data in chunked format", +) def run( in_data_file: Path, yaml_config: Path, @@ -134,8 +139,10 @@ def run( monitor_output: TextIO, syslog_host: str, syslog_port: int, + chunk_intermediate: bool, ): """Run a pipeline defined in YAML on input data.""" + httomo.globals.CHUNK_INTERMEDIATE = chunk_intermediate comm = MPI.COMM_WORLD does_contain_sweep = is_sweep_pipeline(yaml_config) diff --git a/httomo/globals.py b/httomo/globals.py index 2682cdd50..915538f04 100644 --- a/httomo/globals.py +++ b/httomo/globals.py @@ -9,3 +9,4 @@ ) SYSLOG_SERVER = "localhost" SYSLOG_PORT = 514 +CHUNK_INTERMEDIATE: bool = False diff --git a/httomo/method_wrappers/save_intermediate.py b/httomo/method_wrappers/save_intermediate.py index f4d1003d1..af76bafaf 100644 --- a/httomo/method_wrappers/save_intermediate.py +++ b/httomo/method_wrappers/save_intermediate.py @@ -66,6 +66,7 @@ def execute(self, block: T) -> T: data, global_shape=block.global_shape, global_index=block.global_index, + slicing_dim=block.slicing_dim, file=self._file, path="/data", detector_x=self._loader.detector_x, diff --git a/httomo/methods.py b/httomo/methods.py index 9685c3e7e..e0ca3871f 100644 --- a/httomo/methods.py +++ b/httomo/methods.py @@ -3,6 +3,7 @@ import numpy as np import h5py +import httomo from httomo.runner.dataset import DataSetBlock from httomo.utils import xp @@ -36,6 +37,7 @@ def save_intermediate_data( data: np.ndarray, global_shape: Tuple[int, int, int], global_index: Tuple[int, int, int], + slicing_dim: int, file: h5py.File, path: str, detector_x: int, @@ -43,8 +45,25 @@ def save_intermediate_data( angles: np.ndarray, ) -> None: """Saves intermediate data to a file, including auxiliary""" + if httomo.globals.CHUNK_INTERMEDIATE: + chunk_shape = [0, 0, 0] + chunk_shape[slicing_dim] = 1 + DIMS = [0, 1, 2] + non_slicing_dims = list(set(DIMS) - set([slicing_dim])) + for dim in non_slicing_dims: + chunk_shape[dim] = global_shape[dim] + chunk_shape = tuple(chunk_shape) + else: + chunk_shape = None + # only create if not already present - otherwise return existing dataset - dataset = file.require_dataset(path, global_shape, data.dtype, exact=True) + dataset = file.require_dataset( + path, + global_shape, + data.dtype, + exact=True, + chunks=chunk_shape, + ) _save_dataset_data(dataset, data, global_shape, global_index) _save_auxiliary_data(file, angles, detector_x, detector_y) diff --git a/tests/method_wrappers/test_save_intermediate.py b/tests/method_wrappers/test_save_intermediate.py index 3e16fe3a0..1f71e489c 100644 --- a/tests/method_wrappers/test_save_intermediate.py +++ b/tests/method_wrappers/test_save_intermediate.py @@ -29,6 +29,7 @@ def save_intermediate_data( data, global_shape: Tuple[int, int, int], global_index: Tuple[int, int, int], + slicing_dim: int, file: h5py.File, path: str, detector_x: int, @@ -39,6 +40,7 @@ def save_intermediate_data( assert data.shape == dummy_block.shape assert global_index == (0, 0, 0) assert global_shape == dummy_block.shape + assert slicing_dim == 0 assert Path(file.filename).name == "task1-testpackage-testmethod-XXX.h5" assert detector_x == 10 assert detector_y == 20 @@ -79,6 +81,7 @@ def save_intermediate_data( data, global_shape: Tuple[int, int, int], global_index: Tuple[int, int, int], + slicing_dim: int, file: h5py.File, path: str, detector_x: int, @@ -125,6 +128,7 @@ def save_intermediate_data( data, global_shape: Tuple[int, int, int], global_index: Tuple[int, int, int], + slicing_dim: int, file: h5py.File, path: str, detector_x: int, diff --git a/tests/test_methods.py b/tests/test_methods.py index 16f97347b..a6057673d 100644 --- a/tests/test_methods.py +++ b/tests/test_methods.py @@ -96,6 +96,7 @@ def test_save_intermediate_data(tmp_path: Path): b2.data, b2.global_shape, b2.global_index, + b2.slicing_dim, file, path="/data", detector_x=10, @@ -106,6 +107,7 @@ def test_save_intermediate_data(tmp_path: Path): b1.data, b1.global_shape, b1.global_index, + b1.slicing_dim, file, path="/data", detector_x=10, @@ -169,6 +171,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path): b2.data, b2.global_shape, b2.global_index, + b2.slicing_dim, file, path="/data", detector_x=10, @@ -179,6 +182,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path): b1.data, b1.global_shape, b1.global_index, + b1.slicing_dim, file, path="/data", detector_x=10, From 5ac4ec3514003f38ee776be73b323d5336563610 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Thu, 13 Jun 2024 16:59:07 +0100 Subject: [PATCH 2/5] Change chunk intermediate flag to specify frames per-chunk Setting the `--frames-per-chunk` flag to 0 will write the intermediate data as contiguous, and any value > 0 will be the number of frames (projections or sinograms) per-chunk. Thus, this preserves the ability to turn chunking on/off when writing intermediate data, but also adds the ability to choose the number of frames per-chunk. --- httomo/cli.py | 11 ++++++----- httomo/globals.py | 2 +- httomo/methods.py | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/httomo/cli.py b/httomo/cli.py index bd2cc86fb..00f561a0a 100644 --- a/httomo/cli.py +++ b/httomo/cli.py @@ -121,9 +121,10 @@ def check(yaml_config: Path, in_data_file: Optional[Path] = None): help="Port on the host the syslog server is running on", ) @click.option( - "--chunk-intermediate", - is_flag=True, - help="Write intermediate data in chunked format", + "--frames-per-chunk", + type=click.IntRange(0), + default=1, + help="Number of frames per-chunk in intermediate data (0 = write as contiguous)", ) def run( in_data_file: Path, @@ -139,10 +140,10 @@ def run( monitor_output: TextIO, syslog_host: str, syslog_port: int, - chunk_intermediate: bool, + frames_per_chunk: int, ): """Run a pipeline defined in YAML on input data.""" - httomo.globals.CHUNK_INTERMEDIATE = chunk_intermediate + httomo.globals.FRAMES_PER_CHUNK = frames_per_chunk comm = MPI.COMM_WORLD does_contain_sweep = is_sweep_pipeline(yaml_config) diff --git a/httomo/globals.py b/httomo/globals.py index 915538f04..f1adb6bac 100644 --- a/httomo/globals.py +++ b/httomo/globals.py @@ -9,4 +9,4 @@ ) SYSLOG_SERVER = "localhost" SYSLOG_PORT = 514 -CHUNK_INTERMEDIATE: bool = False +FRAMES_PER_CHUNK: int = 1 diff --git a/httomo/methods.py b/httomo/methods.py index e0ca3871f..ac4d36415 100644 --- a/httomo/methods.py +++ b/httomo/methods.py @@ -45,9 +45,9 @@ def save_intermediate_data( angles: np.ndarray, ) -> None: """Saves intermediate data to a file, including auxiliary""" - if httomo.globals.CHUNK_INTERMEDIATE: + if httomo.globals.FRAMES_PER_CHUNK > 0: chunk_shape = [0, 0, 0] - chunk_shape[slicing_dim] = 1 + chunk_shape[slicing_dim] = httomo.globals.FRAMES_PER_CHUNK DIMS = [0, 1, 2] non_slicing_dims = list(set(DIMS) - set([slicing_dim])) for dim in non_slicing_dims: From f2d41a58f350029d298e1e71742a6bbdb0413216 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Fri, 14 Jun 2024 11:03:53 +0100 Subject: [PATCH 3/5] Pass frames per chunk as param to data saving method The method function now is standalone and has no reliance on the framework's global variable `FRAMES_PER_CHUNK` to know the number of frames per-chunk to write. Instead, the associated method wrapper passes the framework's global variable `FRAMES_PER_CHUNK` through to the method function. --- httomo/method_wrappers/save_intermediate.py | 1 + httomo/methods.py | 5 +++-- tests/method_wrappers/test_save_intermediate.py | 12 ++++++++++-- tests/test_methods.py | 4 ++++ 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/httomo/method_wrappers/save_intermediate.py b/httomo/method_wrappers/save_intermediate.py index af76bafaf..ef7b45486 100644 --- a/httomo/method_wrappers/save_intermediate.py +++ b/httomo/method_wrappers/save_intermediate.py @@ -68,6 +68,7 @@ def execute(self, block: T) -> T: global_index=block.global_index, slicing_dim=block.slicing_dim, file=self._file, + frames_per_chunk=httomo.globals.FRAMES_PER_CHUNK, path="/data", detector_x=self._loader.detector_x, detector_y=self._loader.detector_y, diff --git a/httomo/methods.py b/httomo/methods.py index ac4d36415..03232d253 100644 --- a/httomo/methods.py +++ b/httomo/methods.py @@ -39,15 +39,16 @@ def save_intermediate_data( global_index: Tuple[int, int, int], slicing_dim: int, file: h5py.File, + frames_per_chunk: int, path: str, detector_x: int, detector_y: int, angles: np.ndarray, ) -> None: """Saves intermediate data to a file, including auxiliary""" - if httomo.globals.FRAMES_PER_CHUNK > 0: + if frames_per_chunk > 0: chunk_shape = [0, 0, 0] - chunk_shape[slicing_dim] = httomo.globals.FRAMES_PER_CHUNK + chunk_shape[slicing_dim] = frames_per_chunk DIMS = [0, 1, 2] non_slicing_dims = list(set(DIMS) - set([slicing_dim])) for dim in non_slicing_dims: diff --git a/tests/method_wrappers/test_save_intermediate.py b/tests/method_wrappers/test_save_intermediate.py index 1f71e489c..6c9f60b9e 100644 --- a/tests/method_wrappers/test_save_intermediate.py +++ b/tests/method_wrappers/test_save_intermediate.py @@ -1,5 +1,6 @@ from pathlib import Path from typing import Tuple +from unittest import mock import pytest from pytest_mock import MockerFixture from httomo.method_wrappers import make_method_wrapper @@ -20,6 +21,7 @@ def test_save_intermediate( mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: Path ): + FRAMES_PER_CHUNK = 0 loader: LoaderInterface = mocker.create_autospec( LoaderInterface, instance=True, detector_x=10, detector_y=20 ) @@ -30,6 +32,7 @@ def save_intermediate_data( global_shape: Tuple[int, int, int], global_index: Tuple[int, int, int], slicing_dim: int, + frames_per_chunk: int, file: h5py.File, path: str, detector_x: int, @@ -41,6 +44,7 @@ def save_intermediate_data( assert global_index == (0, 0, 0) assert global_shape == dummy_block.shape assert slicing_dim == 0 + assert frames_per_chunk == FRAMES_PER_CHUNK assert Path(file.filename).name == "task1-testpackage-testmethod-XXX.h5" assert detector_x == 10 assert detector_y == 20 @@ -65,7 +69,8 @@ def save_intermediate_data( prev_method=prev_method, ) assert isinstance(wrp, SaveIntermediateFilesWrapper) - res = wrp.execute(dummy_block) + with mock.patch("httomo.globals.FRAMES_PER_CHUNK", FRAMES_PER_CHUNK): + res = wrp.execute(dummy_block) assert res == dummy_block @@ -119,6 +124,7 @@ def test_save_intermediate_leaves_gpu_data( if gpu and not gpu_enabled: pytest.skip("No GPU available") + FRAMES_PER_CHUNK = 0 loader: LoaderInterface = mocker.create_autospec( LoaderInterface, instance=True, detector_x=10, detector_y=20 ) @@ -130,6 +136,7 @@ def save_intermediate_data( global_index: Tuple[int, int, int], slicing_dim: int, file: h5py.File, + frames_per_chunk: int, path: str, detector_x: int, detector_y: int, @@ -161,6 +168,7 @@ def save_intermediate_data( dummy_block.to_gpu() assert dummy_block.is_gpu == gpu - res = wrp.execute(dummy_block) + with mock.patch("httomo.globals.FRAMES_PER_CHUNK", FRAMES_PER_CHUNK): + res = wrp.execute(dummy_block) assert res.is_gpu == gpu diff --git a/tests/test_methods.py b/tests/test_methods.py index a6057673d..992bee9b6 100644 --- a/tests/test_methods.py +++ b/tests/test_methods.py @@ -98,6 +98,7 @@ def test_save_intermediate_data(tmp_path: Path): b2.global_index, b2.slicing_dim, file, + frames_per_chunk=0, path="/data", detector_x=10, detector_y=20, @@ -109,6 +110,7 @@ def test_save_intermediate_data(tmp_path: Path): b1.global_index, b1.slicing_dim, file, + frames_per_chunk=0, path="/data", detector_x=10, detector_y=20, @@ -173,6 +175,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path): b2.global_index, b2.slicing_dim, file, + frames_per_chunk=0, path="/data", detector_x=10, detector_y=20, @@ -184,6 +187,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path): b1.global_index, b1.slicing_dim, file, + frames_per_chunk=0, path="/data", detector_x=10, detector_y=20, From 307ee6215eaa2bd69dc35c64233f940eb9420854 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Fri, 14 Jun 2024 11:46:15 +0100 Subject: [PATCH 4/5] Add tests for writing chunked intermediate data --- tests/test_methods.py | 121 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/test_methods.py b/tests/test_methods.py index 992bee9b6..4e4665477 100644 --- a/tests/test_methods.py +++ b/tests/test_methods.py @@ -206,3 +206,124 @@ def test_save_intermediate_data_mpi(tmp_path: Path): np.testing.assert_array_equal(file["test_file.h5"], [0, 0]) assert "data_dims" in file np.testing.assert_array_equal(file["data_dims"]["detector_x_y"], [10, 20]) + + +@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5]) +def test_save_intermediate_data_frames_per_chunk( + tmp_path: Path, + frames_per_chunk: int, +): + FILE_NAME = "test_file.h5" + DATA_PATH = "/data" + GLOBAL_SHAPE = (10, 10, 10) + global_data = np.arange(np.prod(GLOBAL_SHAPE), dtype=np.float32).reshape( + GLOBAL_SHAPE + ) + aux_data = AuxiliaryData(angles=np.ones(GLOBAL_SHAPE[0], dtype=np.float32)) + block = DataSetBlock( + data=global_data, + aux_data=aux_data, + slicing_dim=0, + block_start=0, + chunk_start=0, + chunk_shape=GLOBAL_SHAPE, + global_shape=GLOBAL_SHAPE, + ) + + with h5py.File(tmp_path / FILE_NAME, "w") as f: + save_intermediate_data( + data=block.data, + global_shape=block.global_shape, + global_index=block.global_index, + slicing_dim=block.slicing_dim, + file=f, + frames_per_chunk=frames_per_chunk, + path=DATA_PATH, + detector_x=block.global_shape[2], + detector_y=block.global_shape[1], + angles=block.angles, + ) + + # Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing + # dim of the data that was saved + expected_chunk_shape = [0, 0, 0] + expected_chunk_shape[block.slicing_dim] = frames_per_chunk + DIMS = [0, 1, 2] + non_slicing_dims = list(set(DIMS) - set([block.slicing_dim])) + for dim in non_slicing_dims: + expected_chunk_shape[dim] = block.global_shape[dim] + + with h5py.File(tmp_path / FILE_NAME, "r") as f: + chunk_shape = f[DATA_PATH].chunks + + if frames_per_chunk != 0: + assert chunk_shape == tuple(expected_chunk_shape) + else: + assert chunk_shape is None + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5]) +def test_save_intermediate_data_frames_per_chunk_mpi( + tmp_path: Path, + frames_per_chunk: int, +): + COMM = MPI.COMM_WORLD + tmp_path = COMM.bcast(tmp_path) + FILE_NAME = "test_file.h5" + DATA_PATH = "/data" + SLICING_DIM = 0 + GLOBAL_SHAPE = (10, 10, 10) + CHUNK_SIZE = GLOBAL_SHAPE[SLICING_DIM] // 2 + global_data = np.arange(np.prod(GLOBAL_SHAPE), dtype=np.float32).reshape( + GLOBAL_SHAPE + ) + aux_data = AuxiliaryData(angles=np.ones(GLOBAL_SHAPE[0], dtype=np.float32)) + rank_data = ( + global_data[:CHUNK_SIZE, :, :] + if COMM.rank == 0 + else global_data[CHUNK_SIZE:, :, :] + ) + block = DataSetBlock( + data=rank_data, + aux_data=aux_data, + slicing_dim=0, + block_start=0, + chunk_start=0 if COMM.rank == 0 else CHUNK_SIZE, + global_shape=GLOBAL_SHAPE, + chunk_shape=(CHUNK_SIZE, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]), + ) + + with h5py.File(tmp_path / FILE_NAME, "w", driver="mpio", comm=COMM) as f: + save_intermediate_data( + data=block.data, + global_shape=block.global_shape, + global_index=block.global_index, + slicing_dim=block.slicing_dim, + file=f, + frames_per_chunk=frames_per_chunk, + path=DATA_PATH, + detector_x=block.global_shape[2], + detector_y=block.global_shape[1], + angles=block.angles, + ) + + # Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing + # dim of the data that was saved + expected_chunk_shape = [0, 0, 0] + expected_chunk_shape[block.slicing_dim] = frames_per_chunk + DIMS = [0, 1, 2] + non_slicing_dims = list(set(DIMS) - set([block.slicing_dim])) + for dim in non_slicing_dims: + expected_chunk_shape[dim] = block.global_shape[dim] + + with h5py.File(tmp_path / FILE_NAME, "r") as f: + chunk_shape = f[DATA_PATH].chunks + + if frames_per_chunk != 0: + assert chunk_shape == tuple(expected_chunk_shape) + else: + assert chunk_shape is None From aef83c9120bd047f1ed3507de32fb0417d05eed7 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Fri, 14 Jun 2024 14:08:12 +0100 Subject: [PATCH 5/5] Use 1 frame per chunk if flag value exceeds slicing dim length If the value provided for the `--frames-per-chunk` flag is larger than the length of the data's slicing dimension, then the number of frames per-chunk will be set to 1 (and will be logged in the verbose logfile). This is to avoid `h5py` errors with trying to create a chunk whose shape exceeds the boundaries of the data being written. --- httomo/methods.py | 12 +++++++++++- tests/test_methods.py | 12 ++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/httomo/methods.py b/httomo/methods.py index 03232d253..e30ec04c1 100644 --- a/httomo/methods.py +++ b/httomo/methods.py @@ -1,3 +1,4 @@ +import logging import pathlib from typing import Tuple import numpy as np @@ -5,7 +6,7 @@ import httomo from httomo.runner.dataset import DataSetBlock -from httomo.utils import xp +from httomo.utils import log_once, xp __all__ = ["calculate_stats", "save_intermediate_data"] @@ -46,6 +47,15 @@ def save_intermediate_data( angles: np.ndarray, ) -> None: """Saves intermediate data to a file, including auxiliary""" + if frames_per_chunk > data.shape[slicing_dim]: + warn_message = ( + f"frames_per_chunk={frames_per_chunk} exceeds number of elements in " + f"slicing dim={slicing_dim} of data with shape {data.shape}. Falling " + "back to 1 frame per-chunk" + ) + log_once(warn_message, logging.DEBUG) + frames_per_chunk = 1 + if frames_per_chunk > 0: chunk_shape = [0, 0, 0] chunk_shape[slicing_dim] = frames_per_chunk diff --git a/tests/test_methods.py b/tests/test_methods.py index 4e4665477..7e3687e23 100644 --- a/tests/test_methods.py +++ b/tests/test_methods.py @@ -208,7 +208,7 @@ def test_save_intermediate_data_mpi(tmp_path: Path): np.testing.assert_array_equal(file["data_dims"]["detector_x_y"], [10, 20]) -@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5]) +@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5, 1000]) def test_save_intermediate_data_frames_per_chunk( tmp_path: Path, frames_per_chunk: int, @@ -247,7 +247,9 @@ def test_save_intermediate_data_frames_per_chunk( # Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing # dim of the data that was saved expected_chunk_shape = [0, 0, 0] - expected_chunk_shape[block.slicing_dim] = frames_per_chunk + expected_chunk_shape[block.slicing_dim] = ( + frames_per_chunk if frames_per_chunk != 1000 else 1 + ) DIMS = [0, 1, 2] non_slicing_dims = list(set(DIMS) - set([block.slicing_dim])) for dim in non_slicing_dims: @@ -266,7 +268,7 @@ def test_save_intermediate_data_frames_per_chunk( @pytest.mark.skipif( MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" ) -@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5]) +@pytest.mark.parametrize("frames_per_chunk", [0, 1, 5, 1000]) def test_save_intermediate_data_frames_per_chunk_mpi( tmp_path: Path, frames_per_chunk: int, @@ -314,7 +316,9 @@ def test_save_intermediate_data_frames_per_chunk_mpi( # Define the expected chunk shape, based on the `frames_per_chunk` value and the slicing # dim of the data that was saved expected_chunk_shape = [0, 0, 0] - expected_chunk_shape[block.slicing_dim] = frames_per_chunk + expected_chunk_shape[block.slicing_dim] = ( + frames_per_chunk if frames_per_chunk != 1000 else 1 + ) DIMS = [0, 1, 2] non_slicing_dims = list(set(DIMS) - set([block.slicing_dim])) for dim in non_slicing_dims: