Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata race conditions prevent multi-position parallelization #444

Open
talonchandler opened this issue Oct 3, 2023 · 2 comments
Open

Comments

@talonchandler
Copy link
Collaborator

talonchandler commented Oct 3, 2023

Typical recOrder reconstructions use globs to select multiple positions like this:

recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/*/*/* -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr

which multiprocesses over positions. This works well, but it requires a lot of memory on a single node.

One option is to send indivdual positions to different jobs/processes/nodes with:

recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/A/1/1/ -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr & 
recorder apply-inv-tf -i PAL027_DENV_A549_raw.zarr/B/2/2/ -t phase_tf.zarr -c ./phase.yaml -o test_output.zarr &

but this causes a metadata race condition. Both processes will create a new metadata file because one does not exist, and the second job to complete the metadata creation will "win". For example, if the first process finishes last, iohub info will show

Row names:		 ['A']
Column names:		 ['1']

@ziw-liu and I discussed, and we're strongly considering changing recorder compute-tf to recorder prepare. Instead of just computing the transfer function, recorder prepare will take a list of positions and a list of configs (one for each channel) and it will compute the transfer functions and prepare the output zarr store, so that the parallelizable recorder apply-inv-tf can fill it.

This means that create_empty_zarr (or a near variant) can be called once in advance, instead of in each process which creates the race conditions.

@ziw-liu
Copy link
Contributor

ziw-liu commented Oct 3, 2023

A workaround for now is to use a different output path for each process (use $input_fov_name as output path in the Slurm script), and aggregate with a small python script that calls iohub.ngff.Plate.from_positions().

@talonchandler talonchandler changed the title Metadata race conditions Metadata race conditions prevent multi-position parallelization Oct 31, 2023
@edyoshikun
Copy link
Contributor

In my workflow, I use the slurmkit scripts that first create_empty_zarr(), compute the transfer function, and then using the input filepaths, I send one position to one node to do the apply_inv_affine. I haven't encountered any issues doing this. The advantage of the slurmkit is that it's all Python and there is no need to create additional slurm scripts if you have dependencies.

I wouldn't merge both functions into one. I think the compute-tf has value on its own, and I would leave it separately. what you can do is similar to reconstruct where you call the compute and then the apply_inv, have a prepare, function that calls the create_empty_hcs_zarr and the compute_tf.

Some thoughts on the parellization:

  • We could make a CLI call for a slurmkit submission based on a config file. The only caveat I see is that it would be almost as equivalent as having slurm scripts per job or slurmkit.py file.
  • Go back to having slurm scripts with dependencies. I am a bit reluctant on this one because of the amount of slurm scripts that pile up depending on the number of steps one has to do. In the future, I do image that for the mantis we would like to string the jobs such that each position gets reconstructed, registered, stabilized and segmented without minimal supervision and perhaps pythonic way is probably better(?)

This is the script I use using recOrder 0.4.0

import datetime
import os
import glob
from slurmkit import SlurmParams, slurm_function, submit_function
from natsort import natsorted
import click
from iohub import open_ome_zarr
from recOrder.cli.settings import ReconstructionSettings
import torch
from recOrder.io import utils
from recOrder.cli.utils import (
    apply_inverse_to_zyx_and_save,
    create_empty_hcs_zarr,
)
from recOrder.cli.compute_transfer_function import (
    compute_transfer_function_cli,
)

from recOrder.cli.apply_inverse_transfer_function import (
    get_reconstruction_output_metadata,
    apply_inverse_transfer_function_single_position,
)
from pathlib import Path


# Reconstruction parameters
config_path = "./phase.yml"
transfer_function_path = "./TF_phase3D.zarr"

# io parameters
input_position_dirpaths = "/hpc/projects/comp.micro/mantis/2023_08_09_HEK_PCNA_H2B/00-zarr_iohub_010dev4/pcna_timelapse_1/pcna_timelapse_labelfree_1.zarr/0/0/0"
output_dirpath = "./timelapse_reg_1e-5.zarr"
# sbatch and resource parameters
cpus_per_task = 3
mem_per_cpu = "8G"
time = 40  # minutes

# Path handling
input_position_dirpaths = [
    Path(path) for path in natsorted(glob.glob(input_position_dirpaths))
]
output_dirpath = Path(output_dirpath)
slurm_out_path = output_dirpath.parent / "slurm_output/recon-%j.out"

transfer_function_path = Path(transfer_function_path)
config_path = Path(config_path)

## First compute-tf
# Handle transfer function path
# Compute transfer function
compute_transfer_function_cli(
    input_position_dirpath=input_position_dirpaths[0],
    config_filepath=config_path,
    output_dirpath=transfer_function_path,
)

## Second apply-inv-tf
output_metadata = get_reconstruction_output_metadata(
    input_position_dirpaths[0], config_path
)

create_empty_hcs_zarr(
    store_path=output_dirpath,
    position_keys=[p.parts[-3:] for p in input_position_dirpaths],
    **output_metadata,
)

click.echo(f"in: {input_position_dirpaths}, out: {output_dirpath}")

# prepare slurm parameters
params = SlurmParams(
    partition="preempted",
    cpus_per_task=cpus_per_task,
    mem_per_cpu=mem_per_cpu,
    time=datetime.timedelta(minutes=time),
    output=slurm_out_path,
)

# wrap our process_single_position() function with slurmkit
slurm_reconstruct_single_position = slurm_function(
    apply_inverse_transfer_function_single_position
)
reconstruct_func = slurm_reconstruct_single_position(
    transfer_function_dirpath=transfer_function_path,
    config_filepath=config_path,
    num_processes=cpus_per_task,
)

# generate an array of jobs by passing the in_path and out_path to slurm wrapped function
recon_jobs = [
    submit_function(
        reconstruct_func,
        slurm_params=params,
        input_position_dirpath=input_position_dirpath,
        output_position_dirpath=output_dirpath
        / Path(*input_position_dirpath.parts[-3:]),
    )
    for input_position_dirpath in input_position_dirpaths
]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants