From 57c07c2c5ca14290101dc356ddc7c4b44ffc2c1e Mon Sep 17 00:00:00 2001 From: Arjun Arkal Rao Date: Fri, 4 May 2018 16:40:28 -0700 Subject: [PATCH] Add memory requirement to DNA alignment (resolves #278) resolves #278 --- src/protect/alignment/dna.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/protect/alignment/dna.py b/src/protect/alignment/dna.py index c087fe93..105fa3d8 100644 --- a/src/protect/alignment/dna.py +++ b/src/protect/alignment/dna.py @@ -65,25 +65,35 @@ def align_dna(job, fastqs, sample_type, univ_options, bwa_options): +- '_fix_pg_sorted.bam.bai': fsID :rtype: dict """ + # The mkdup and regroup steps use picard that allots heap space using the Xmx key in the + # univ_options dictionary. This should reflect in the job allotment. Since We want all these + # jobs to occur on the same node, we ened to give them all the same memory requirements. + bwa = job.wrapJobFn(run_bwa, fastqs, sample_type, univ_options, bwa_options, disk=PromisedRequirement(bwa_disk, fastqs, bwa_options['index']), + memory=univ_options['java_Xmx'], cores=bwa_options['n']) sam2bam = job.wrapJobFn(bam_conversion, bwa.rv(), sample_type, univ_options, bwa_options['samtools'], - disk=PromisedRequirement(sam2bam_disk, bwa.rv())) + disk=PromisedRequirement(sam2bam_disk, bwa.rv()), + memory=univ_options['java_Xmx']) # reheader takes the same disk as sam2bam so we can serialize this on the same worker. reheader = job.wrapJobFn(fix_bam_header, sam2bam.rv(), sample_type, univ_options, bwa_options['samtools'], - disk=PromisedRequirement(sam2bam_disk, bwa.rv())) + disk=PromisedRequirement(sam2bam_disk, bwa.rv()), + memory=univ_options['java_Xmx']) regroup = job.wrapJobFn(add_readgroups, reheader.rv(), sample_type, univ_options, bwa_options['picard'], - disk=PromisedRequirement(regroup_disk, reheader.rv())) + disk=PromisedRequirement(regroup_disk, reheader.rv()), + memory=univ_options['java_Xmx']) mkdup = job.wrapJobFn(mark_duplicates, regroup.rv(), sample_type, univ_options, bwa_options['picard'], - disk=PromisedRequirement(mkdup_disk, regroup.rv())) + disk=PromisedRequirement(mkdup_disk, regroup.rv()), + memory=univ_options['java_Xmx']) index = job.wrapJobFn(index_bamfile, mkdup.rv(), sample_type, univ_options, bwa_options['samtools'], sample_info='fix_pg_sorted', - disk=PromisedRequirement(index_disk, mkdup.rv())) + disk=PromisedRequirement(index_disk, mkdup.rv()), + memory=univ_options['java_Xmx']) job.addChild(bwa) bwa.addChild(sam2bam) sam2bam.addChild(reheader)