From 82a22ec2e57dec850801efaa7628dcc4e1449506 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 29 Jun 2016 10:37:00 -0700 Subject: [PATCH] Remove duplicate functions in gatk_preprocessing (resolves #330) --- src/toil_scripts/tools/preprocessing.py | 59 +++++++++++++++++-------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/src/toil_scripts/tools/preprocessing.py b/src/toil_scripts/tools/preprocessing.py index 4439afa6..e2f9b1e3 100644 --- a/src/toil_scripts/tools/preprocessing.py +++ b/src/toil_scripts/tools/preprocessing.py @@ -100,7 +100,7 @@ def run_picard_create_sequence_dictionary(job, ref_id): return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.dict')) -def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, mem='10G'): +def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, mem='10G', unsafe=False): """ Convenience method for grouping together GATK preprocessing @@ -115,13 +115,14 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill :param str mills: Mills VCF FileStoreID :param str dbsnp: DBSNP VCF FileStoreID :param str mem: Memory value to be passed to children. Needed for CI tests + :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: BAM and BAI FileStoreIDs from Print Reads :rtype: tuple(str, str) """ - rtc = job.wrapJobFn(run_realigner_target_creator, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem) - ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase, mills, mem) - br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai, dbsnp, mem) - pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai, mem) + rtc = job.wrapJobFn(run_realigner_target_creator, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe) + ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe) + br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai, dbsnp, mem, unsafe) + pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai, mem, unsafe) # Wiring job.addChild(rtc) rtc.addChild(ir) @@ -130,7 +131,7 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill return pr.rv(0), pr.rv(1) -def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem='10G'): +def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False): """ Creates intervals file needed for indel realignment @@ -144,13 +145,14 @@ def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase :param str phase: Phase VCF FileStoreID :param str mills: Mills VCF FileStoreID :param str mem: Memory value to be passed to children. Needed for CI tests + :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: FileStoreID for the processed bam :rtype: str """ work_dir = job.fileStore.getLocalTempDir() file_ids = [ref, fai, ref_dict, bam, bai, phase, mills] - file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] - for file_store_id, name in zip(file_ids, file_names): + inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] + for file_store_id, name in zip(file_ids, inputs): job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) # Call: GATK -- RealignerTargetCreator parameters = ['-T', 'RealignerTargetCreator', @@ -161,13 +163,17 @@ def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase '-known', '/data/mills.vcf', '--downsampling_type', 'NONE', '-o', '/data/sample.intervals'] + if unsafe: + parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', + inputs=inputs, + outputs={'sample.intervals': None}, work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) # Write to fileStore return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals')) -def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, mills, mem): +def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False): """ Creates realigned bams using the intervals file from previous step @@ -181,14 +187,15 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, m :param str phase: Phase VCF FileStoreID :param str mills: Mills VCF FileStoreID :param str mem: Memory value to be passed to children. Needed for CI tests + :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: FileStoreID for the processed bam :rtype: tuple(str, str) """ work_dir = job.fileStore.getLocalTempDir() file_ids = [ref, fai, ref_dict, intervals, bam, bai, phase, mills] - file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.intervals', - 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] - for file_store_id, name in zip(file_ids, file_names): + inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.intervals', + 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf'] + for file_store_id, name in zip(file_ids, inputs): job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) # Call: GATK -- IndelRealigner parameters = ['-T', 'IndelRealigner', @@ -201,7 +208,11 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, m '-maxReads', str(720000), '-maxInMemory', str(5400000), '-o', '/data/sample.indel.bam'] + if unsafe: + parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', + inputs=inputs, + outputs={'sample.indel.bam': None, 'sample.indel.bai': None}, work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) # Write to fileStore indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.indel.bam')) @@ -209,7 +220,7 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, m return indel_bam, indel_bai -def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, dbsnp, mem): +def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, dbsnp, mem, unsafe=False): """ Creates recal table used in Base Quality Score Recalibration @@ -222,13 +233,14 @@ def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, :param str fai: Reference index FileStoreID :param str dbsnp: DBSNP VCF FileStoreID :param str mem: Memory value to be passed to children. Needed for CI tests + :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: FileStoreID for the processed bam :rtype: str """ work_dir = job.fileStore.getLocalTempDir() file_ids = [ref, fai, ref_dict, indel_bam, indel_bai, dbsnp] - file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.indel.bam', 'sample.indel.bai', 'dbsnp.vcf'] - for file_store_id, name in zip(file_ids, file_names): + inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.indel.bam', 'sample.indel.bai', 'dbsnp.vcf'] + for file_store_id, name in zip(file_ids, inputs): job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) # Call: GATK -- IndelRealigner parameters = ['-T', 'BaseRecalibrator', @@ -237,13 +249,17 @@ def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, '-I', '/data/sample.indel.bam', '-knownSites', '/data/dbsnp.vcf', '-o', '/data/sample.recal.table'] + if unsafe: + parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', + inputs=inputs, + outputs={'sample.recal.table': None}, work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) # Write output to file store return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.recal.table')) -def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, mem): +def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, mem, unsafe=False): """ Creates BAM that has had the base quality scores recalibrated @@ -256,14 +272,15 @@ def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, :param str ref_dict: Reference dictionary FileStoreID :param str fai: Reference index FileStoreID :param str mem: Memory value to be passed to children. Needed for CI tests + :param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: FileStoreID for the processed bam :rtype: tuple(str, str) """ work_dir = job.fileStore.getLocalTempDir() file_ids = [ref, fai, ref_dict, table, indel_bam, indel_bai] - file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.recal.table', - 'sample.indel.bam', 'sample.indel.bai'] - for file_store_id, name in zip(file_ids, file_names): + inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.recal.table', + 'sample.indel.bam', 'sample.indel.bai'] + for file_store_id, name in zip(file_ids, inputs): job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) # Call: GATK -- PrintReads parameters = ['-T', 'PrintReads', @@ -273,7 +290,11 @@ def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, '-I', '/data/sample.indel.bam', '-BQSR', '/data/sample.recal.table', '-o', '/data/sample.bqsr.bam'] + if unsafe: + parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', + inputs=inputs, + outputs={'sample.bqsr.bam': None, 'sample.bqsr.bai': None}, work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem))) # Write ouptut to file store bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bqsr.bam'))