diff --git a/src/protect/alignment/dna.py b/src/protect/alignment/dna.py index c087fe93..105fa3d8 100644 --- a/src/protect/alignment/dna.py +++ b/src/protect/alignment/dna.py @@ -65,25 +65,35 @@ def align_dna(job, fastqs, sample_type, univ_options, bwa_options): +- '_fix_pg_sorted.bam.bai': fsID :rtype: dict """ + # The mkdup and regroup steps use picard that allots heap space using the Xmx key in the + # univ_options dictionary. This should reflect in the job allotment. Since We want all these + # jobs to occur on the same node, we ened to give them all the same memory requirements. + bwa = job.wrapJobFn(run_bwa, fastqs, sample_type, univ_options, bwa_options, disk=PromisedRequirement(bwa_disk, fastqs, bwa_options['index']), + memory=univ_options['java_Xmx'], cores=bwa_options['n']) sam2bam = job.wrapJobFn(bam_conversion, bwa.rv(), sample_type, univ_options, bwa_options['samtools'], - disk=PromisedRequirement(sam2bam_disk, bwa.rv())) + disk=PromisedRequirement(sam2bam_disk, bwa.rv()), + memory=univ_options['java_Xmx']) # reheader takes the same disk as sam2bam so we can serialize this on the same worker. reheader = job.wrapJobFn(fix_bam_header, sam2bam.rv(), sample_type, univ_options, bwa_options['samtools'], - disk=PromisedRequirement(sam2bam_disk, bwa.rv())) + disk=PromisedRequirement(sam2bam_disk, bwa.rv()), + memory=univ_options['java_Xmx']) regroup = job.wrapJobFn(add_readgroups, reheader.rv(), sample_type, univ_options, bwa_options['picard'], - disk=PromisedRequirement(regroup_disk, reheader.rv())) + disk=PromisedRequirement(regroup_disk, reheader.rv()), + memory=univ_options['java_Xmx']) mkdup = job.wrapJobFn(mark_duplicates, regroup.rv(), sample_type, univ_options, bwa_options['picard'], - disk=PromisedRequirement(mkdup_disk, regroup.rv())) + disk=PromisedRequirement(mkdup_disk, regroup.rv()), + memory=univ_options['java_Xmx']) index = job.wrapJobFn(index_bamfile, mkdup.rv(), sample_type, univ_options, bwa_options['samtools'], sample_info='fix_pg_sorted', - disk=PromisedRequirement(index_disk, mkdup.rv())) + disk=PromisedRequirement(index_disk, mkdup.rv()), + memory=univ_options['java_Xmx']) job.addChild(bwa) bwa.addChild(sam2bam) sam2bam.addChild(reheader)