Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize sequential flag with drmaa or multiple cores #133

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 115 additions & 29 deletions cubids/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import tempfile
import tqdm
import shutil
import drmaa
import pandas as pd
from cubids import CuBIDS
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from .validator import (build_validator_call,
run_validator, parse_validator_output,
build_subject_paths)
build_subject_paths, build_drmaa_batch)
from .metadata_merge import merge_json_into_json

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -68,6 +70,17 @@ def cubids_validate():
'sub-01 sub-02 sub-03',
nargs='+',
required=False)
parser.add_argument('--drmaa',
action='store_true',
default=False,
help='When running the validator sequentially, submit jobs to scheduler instead of running subprocess',
required=False)
parser.add_argument('--n_cpus',
action='store',
type=int,
default=1,
help='Number of cores to utilize',
required=False)
opts = parser.parse_args()

# Run directly from python using subprocess
Expand Down Expand Up @@ -111,6 +124,8 @@ def cubids_validate():
# iterate over the dictionary

parsed = []
# it's easier to parallelize a queue
queue = [] if opts.drmaa is True or opts.n_cpus > 1 else False

if opts.sequential_subjects:
subjects_dict = {k: v for k, v in subjects_dict.items()
Expand All @@ -121,44 +136,115 @@ def cubids_validate():

logger.info(" ".join(["Processing subject:", subject]))
# create a temporary directory and symlink the data
with tempfile.TemporaryDirectory() as tmpdirname:
for fi in files_list:

# cut the path down to the subject label
bids_start = fi.find(subject)

# maybe it's a single file
if bids_start < 1:
bids_folder = tmpdirname
fi_tmpdir = tmpdirname

else:
bids_folder = Path(fi[bids_start:]).parent
fi_tmpdir = tmpdirname + '/' + str(bids_folder)

if not os.path.exists(fi_tmpdir):
os.makedirs(fi_tmpdir)
output = fi_tmpdir + '/' + str(Path(fi).name)
shutil.copy2(fi, output)

# run the validator
nifti_head = opts.ignore_nifti_headers
subj_consist = opts.ignore_subject_consistency
call = build_validator_call(tmpdirname,
nifti_head,
subj_consist)
# TMPDIR isn't networked (available on login + exec nodes), so use bids_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be ok if bids_dir is in datalad?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm good point I hadn't thought about needing to unlock stuff. I admit it's very hacky and made me almost think this isn't a good problem to submit to the grid as it requires so many temporary files that need to be on a network drive (not $TMPDIR), but I'm not sure what'd the best solution would be. Maybe we could use a users home directory, say, ~/.cubids as the tmpdir?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get a tmpdir on the compute node and copy the files into that?

Copy link
Author

@Terf Terf Aug 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be possible to move more of the logic within the grid job so scripts don't have to be written to a networked drive, but since it's impossible to connect the stdout of the grid job to the main process, the output will ultimately have to get written out to some file which needs to be on a networked drive unless all the jobs, including the main process, are running on the same exec node

tmpdir = tempfile.TemporaryDirectory(dir=opts.bids_dir, prefix=".")
for fi in files_list:

# cut the path down to the subject label
bids_start = fi.find(subject)

# maybe it's a single file
if bids_start < 1:
bids_folder = tmpdir.name
fi_tmpdir = tmpdir.name

else:
bids_folder = Path(fi[bids_start:]).parent
fi_tmpdir = tmpdir.name + '/' + str(bids_folder)

if not os.path.exists(fi_tmpdir):
os.makedirs(fi_tmpdir)
output = fi_tmpdir + '/' + str(Path(fi).name)
shutil.copy2(fi, output)

# run the validator
nifti_head = opts.ignore_nifti_headers
subj_consist = opts.ignore_subject_consistency
call = build_validator_call(tmpdir.name,
nifti_head,
subj_consist)

if not queue:
ret = run_validator(call)
# parse output
# execute and parse output immediately
if ret.returncode != 0:
logger.error("Errors returned "
"from validator run, parsing now")
"from validator run, parsing now")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may break flake8

Copy link
Author

@Terf Terf Aug 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a particular formatter e.g. black or autopep8 you're using for the project?


# parse the output and add to list if it returns a df
decoded = ret.stdout.decode('UTF-8')
tmp_parse = parse_validator_output(decoded)
if tmp_parse.shape[1] > 1:
tmp_parse['subject'] = subject
parsed.append(tmp_parse)
else:
queue.append({
'call': call,
'tmpdir': tmpdir
})


if opts.drmaa:
try:
drmaa_ses = drmaa.Session()
drmaa_ses.initialize()
tmpfiles = []
jids = []

for batch in build_drmaa_batch(queue):
tmp = tempfile.NamedTemporaryFile(delete=False, dir=opts.bids_dir, prefix=".", suffix=".sh")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something a user can customize? Or will they need to customize it? does this work out of the box on cubic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what would need to be customized? It indeed works out of the box on CUBIC, LSF also supports DRMAA but PMACS set it up in a weird way and sounded uninteresting in changing that when I asked :(

tmp.write(batch['script'].encode())
tmp.close() # this is very important
os.chmod(tmp.name, 0o755) # make executable
tmpfiles.append(tmp.name)
jt = drmaa_ses.createJobTemplate()
jt.remoteCommand = tmp.name
# jt.args = call[1:]
jt.blockEmail = False
trash = ':' + os.devnull
jt.outputPath = trash
jt.errorPath = trash
jids.append(drmaa_ses.runJob(jt))

# wait for all jobs to finish to parse results
logger.info("Waiting for jobs to complete")
drmaa_ses.synchronize(jids, drmaa.Session.TIMEOUT_WAIT_FOREVER, True)
for q in queue:
# parse output
tmpdir = q['tmpdir']
with open(os.path.join(tmpdir.name, ".cubids"), 'r') as file:
decoded = file.read()
tmp_parse = parse_validator_output(decoded)
if tmp_parse.shape[1] > 1:
tmp_parse['subject'] = subject
parsed.append(tmp_parse)

tmpdir.cleanup()
drmaa_ses.exit()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(repr(e), fname, exc_tb.tb_lineno)
finally:
for tmp in tmpfiles:
os.remove(tmp)
elif opts.n_cpus > 1:
# run in parallel on multiple cores
logger.info("Running validator in parallel")
with ThreadPoolExecutor(max_workers = opts.n_cpus) as p:
ret = p.map(run_validator, [q['call'] for q in queue])

for q in queue:
q['tmpdir'].cleanup()

# parse output
for r in ret:
decoded = r.stdout.decode('UTF-8')
tmp_parse = parse_validator_output(decoded)
if tmp_parse.shape[1] > 1:
tmp_parse['subject'] = subject
parsed.append(tmp_parse)


# concatenate the parsed data and exit, we're goin home fellas
if len(parsed) < 1:
Expand Down
16 changes: 16 additions & 0 deletions cubids/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ def run_validator(call, verbose=True):
stderr=subprocess.PIPE)
return(ret)

def build_drmaa_batch(queue):
batch_size = 10 if len(queue) < 1000 else len(queue) // 10
batch = []
for batch_start in range(0, len(queue), batch_size):
script = []
tmpdirs = []
for idx in range(batch_start, min(batch_start + batch_size, len(queue))):
# bids validator ignore files starting with . https://github.com/bids-standard/bids-validator/issues/348
line = " ".join(queue[idx]['call']) + ' > ' + os.path.join(queue[idx]['tmpdir'].name, ".cubids")
script.append(line)
tmpdirs.append(queue[idx]['tmpdir'])
batch.append({
'script': "\n".join(script),
'tmpdirs': tmpdirs
})
return batch

def parse_validator_output(output):
"""Parse the JSON output of the BIDS validator into a pandas dataframe
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drmaa