Skip to content

Commit

Permalink
Add logging to param sweep runner
Browse files Browse the repository at this point in the history
  • Loading branch information
yousefmoazzam committed Jul 9, 2024
1 parent 000b26e commit e4cf022
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions httomo/sweep_runner/param_sweep_runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import os
from typing import Any, Dict, List, Optional, Tuple

import tqdm

import httomo
from httomo.data.param_sweep_store import ParamSweepReader, ParamSweepWriter
from httomo.runner.block_split import BlockSplitter
from httomo.runner.method_wrapper import MethodWrapper
from httomo.runner.pipeline import Pipeline
from httomo.sweep_runner.param_sweep_block import ParamSweepBlock
from httomo.sweep_runner.side_output_manager import SideOutputManager
from httomo.sweep_runner.stages import NonSweepStage, Stages, SweepStage
from httomo.utils import log_exception
from httomo.utils import catchtime, log_exception, log_once


class ParamSweepRunner:
Expand Down Expand Up @@ -106,6 +110,7 @@ def prepare(self):
)
raise ValueError(err_str)

log_once(f"Loading data with shape {source.global_shape}")
splitter = BlockSplitter(source, source.global_shape[source.slicing_dim])
dataset_block = splitter[0]
sweep_block = ParamSweepBlock(
Expand All @@ -117,6 +122,7 @@ def prepare(self):
def _execute_non_sweep_stage(self, stage: NonSweepStage):
assert self._block is not None
for method in stage.methods:
log_once(f"Running {method.method_name} ({method.package_name})")
self._side_output_manager.update_params(method)
self._block = method.execute(self._block)
self._side_output_manager.append(method.get_side_output())
Expand All @@ -134,7 +140,22 @@ def execute_sweep(self):
writer = ParamSweepWriter(len(self._stages.sweep.values))
method = self._stages.sweep.method

for val in self._stages.sweep.values:
log_once(f"Running {method.method_name} ({method.package_name})")
sweep_info_str = (
f" Parameter sweep over {len(self._stages.sweep.values)} values of "
f"parameter: {self._stages.sweep.param_name}"
)
log_once(sweep_info_str)

# Redirect tqdm progress bar output to /dev/null, and instead manually write sweep
# progress to logfile within loop
progress = tqdm.tqdm(
iterable=self._stages.sweep.values,
file=open(os.devnull, "w"),
unit="value",
ascii=True,
)
for val, _ in zip(self._stages.sweep.values, progress):
# Blocks are modified in-place by method wrappers, so a new block must be created
# that contains a copy of the input data to the sweep stage
block = ParamSweepBlock(
Expand All @@ -143,22 +164,29 @@ def execute_sweep(self):
)
self._side_output_manager.update_params(method)
method.append_config_params({self._stages.sweep.param_name: val})
progress.set_postfix_str(f"{self._stages.sweep.param_name}={val}")
log_once(f" {str(progress)}")
block = method.execute(block)
if len(method.get_side_output().keys()) > 0:
raise ValueError(
"Producing a side output is not supported in parameter sweep methods"
)
writer.write_sweep_result(block)

log_once(" Finished parameter sweep")

reader = ParamSweepReader(writer)
self._block = reader.read_sweep_results()

def execute(self):
"""Load input data and execute all stages (before sweep, sweep, after sweep)"""
self.prepare()
self.execute_before_sweep()
self.execute_sweep()
self.execute_after_sweep()
with catchtime() as t:
log_once(f"See the full log file at: {httomo.globals.run_out_dir}/user.log")
self.prepare()
self.execute_before_sweep()
self.execute_sweep()
self.execute_after_sweep()
log_once(f"Pipeline finished. Took {t.elapsed:.3f}s")


def _count_tuple_values(d: Dict[str, Any]) -> int:
Expand Down

0 comments on commit e4cf022

Please sign in to comment.