diff --git a/src/toil_scripts/quinine_pipelines/__init__.py b/src/toil_scripts/quinine_pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/toil_scripts/quinine_pipelines/adam_helpers.py b/src/toil_scripts/quinine_pipelines/adam_helpers.py new file mode 100644 index 00000000..5602d290 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/adam_helpers.py @@ -0,0 +1,57 @@ +from subprocess import check_call + +from toil_scripts.adam_pipeline.adam_preprocessing import call_adam + +def __call_adam_native(cmd, memory, native_path): + ''' + Calls ADAM running in Spark local mode, where ADAM is not in a docker container. + + :param list cmd: ADAM command line arguments + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + check_call(['%s/bin/adam-submit' % native_path, + '--master', 'local[*]', + '--conf', 'spark.driver.memory=%dg' % memory, + '--'] + cmd) + + +def bam_to_adam_native(bam, parquet, memory, native_path): + ''' + Converts a BAM file into an ADAM AlignmentRecord Parquet file. + + :param str bam: Path to input SAM/BAM file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['transform', bam, parquet], memory, native_path) + + +def feature_to_adam_native(feature, parquet, memory, native_path): + ''' + Converts a feature file (e.g., BED, GTF, GFF) into an ADAM Feature Parquet + file. + + :param str feature: Path to input BED/GTF/GFF/NarrowPeak file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['features2adam', feature, parquet], memory, native_path) + + +def vcf_to_adam_native(vcf, parquet, memory, native_path): + ''' + Converts a VCF file into an ADAM Genotype Parquet file. + + :param str bam: Path to input VCF file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['vcf2adam', vcf, parquet], memory, native_path) diff --git a/src/toil_scripts/quinine_pipelines/metrics.py b/src/toil_scripts/quinine_pipelines/metrics.py new file mode 100644 index 00000000..20f8c249 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/metrics.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python2.7 + +import argparse +import os + +from toil.job import Job + +from toil_scripts.tools.qc import ( call_quinine_af_native, + call_quinine_contamination_native, + call_quinine_hs_native, + call_quinine_rna_native ) +from toil_scripts.quinine_pipelines.adam_helpers import ( bam_to_adam_native, + feature_to_adam_native ) + +def run_rna_qc(job, + reads, transcriptome, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various RNA-seq quality control statistics. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str transcriptome: Path to the transcriptome definition (GTF/GFF). + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the features to ADAM format + adam_features = os.path.join(local_dir, 'transcriptome.adam') + feature_to_adam_native(transcriptome, adam_features, memory, adam_native_path) + + # run the qc job + call_quinine_rna_native(adam_reads, adam_features, local_dir, memory, quinine_native_path) + + +def run_targeted_qc(job, + reads, bait, targets, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the bait features to ADAM format + adam_bait = os.path.join(local_dir, 'bait.adam') + feature_to_adam_native(bait, adam_bait, memory, adam_native_path) + + # convert the target features to ADAM format + adam_targets = os.path.join(local_dir, 'targets.adam') + feature_to_adam_native(targets, adam_targets, memory, adam_native_path) + + # run the metrics + call_quinine_hs_native(adam_reads, adam_targets, adam_bait, + local_dir, + memory, quinine_native_path) + + +def run_contamination_estimation(job, + reads, population, sample_vcf, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the population vcf to ADAM format + adam_population = os.path.join(local_dir, 'population.adam') + vcf_to_adam_native(population, adam_population, memory, adam_native_path) + + # convert the sample vcf to ADAM format + adam_calls = os.path.join(local_dir, 'sample.adam') + vcf_to_adam_native(sample_vcf, adam_calls, memory, adam_native_path) + + # compute MAF's + maf_annotations = os.path.join(local_dir, 'mafs.adam') + call_quinine_af_native(adam_population, maf_annotations, + local_dir, + memory, + quinine_native_path) + + # estimate contamination + call_quinine_contamination_native(adam_reads, + adam_calls, + maf_annotations, + local_dir, + memory, + native_path) + + +def __add_common_args(parser): + ''' + Adds commonly used arguments to a subparser. + + :param argparse.Subparser parser: A subparser to add arguments to. + ''' + + parser.add_argument('--memory', + help='The amount of memory to allocate, in GB. Defaults to 1.', + type=int, + default=1) + parser.add_argument('--adam_native_path', + help='The native path where ADAM is installed.' + 'Defaults to /opt/cgl-docker-lib/adam', + default='/opt/cgl-docker-lib/adam', + type=str) + parser.add_argument('--quinine_native_path', + help='The native path where Quinine is installed.' + 'Defaults to /opt/cgl-docker-lib/quinine', + default='/opt/cgl-docker-lib/quinine', + type=str) + Job.Runner.addToilOptions(parser) + + +def main(): + ''' + Parses arguments and starts the job. + ''' + + # build the argument parser + parser = argparse.ArgumentParser() + + # we run three different commands: hs, cont, rna + subparsers = parser.add_subparsers(dest='command') + parser_rna = subparsers.add_parser('rna', help='Runs the RNA QC metrics.') + parser_hs = subparsers.add_parser('targeted', + help='Runs the QC metrics for a targeted sequencing protocol.') + parser_cont = subparsers.add_parser('contamination', + help='Runs the contamination estimator.') + + # add arguments to the rna panel + parser_rna.add_argument('--reads', + help='The RNA-seq reads.', + type=str, + required=True) + parser_rna.add_argument('--transcriptome', + help='The transcriptome description (e.g., a GENCODE GTF)', + type=str, + required=True) + __add_common_args(parser_rna) + + # add arguments to the hs panel + parser_hs.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_hs.add_argument('--bait', + help='The bait used for capturing this panel.', + type=str, + required=True) + parser_hs.add_argument('--targets', + help='The regions covered by this panel.', + type=str, + required=True) + __add_common_args(parser_hs) + + # add arguments for contaimination estimation + parser_cont.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_cont.add_argument('--population', + help='The VCF to derive allele frequencies from.', + type=str, + required=True) + parser_cont.add_argument('--sample-vcf', + help='A VCF containing known genotypes for the sample.', + type=str, + required=True) + __add_common_args(parser_cont) + + # parse the arguments + args = parser.parse_args() + + # check which command got called, and set up and run + if args.command == 'rna': + Job.Runner.startToil(Job.wrapJobFn(run_rna_qc, + args.reads, + args.transcriptome, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'targeted': + Job.Runner.startToil(Job.wrapJobFn(run_targeted_qc, + args.reads, + args.bait, + args.targets, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'contamination': + Job.Runner.startToil(Job.wrapJobFn(run_contamination_estimation, + args.reads, + args.population, + args.sample_vcf, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + else: + raise ValueError('Unknown command: %s' % args.command) + +if __name__ == '__main__': + main() diff --git a/src/toil_scripts/tools/qc.py b/src/toil_scripts/tools/qc.py index c4a05413..8748b21a 100644 --- a/src/toil_scripts/tools/qc.py +++ b/src/toil_scripts/tools/qc.py @@ -1,5 +1,7 @@ import os +from subprocess import check_call +from toil_scripts.lib import require from toil_scripts.lib.files import tarball_files from toil_scripts.lib.programs import docker_call @@ -27,3 +29,159 @@ def run_fastqc(job, r1_id, r2_id): output_files = [os.path.join(work_dir, x) for x in output_names] tarball_files(tar_name='fastqc.tar.gz', file_paths=output_files, output_dir=work_dir) return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'fastqc.tar.gz')) + + +def __call_quinine(arguments, memory, + run_local=False, local_dir=None, + native_path=None, + leader_ip=None): + ''' + Run Quinine (https://github.com/bigdatagenomics/quinine). + + :param list arguments: Arguments to pass to Quinine. + :param int memory: Amount of memory in GiB to allocate per node. + :param bool run_local: If true, runs quinine in local mode. Default is false. + :param None or string local_dir: If provided, path to a local working + directory. Must be provided if run_local is true. + :param None or string native_path: If set, the path to quinine. If unset, + runs quinine via Docker. Default is None (unset). + :param None or SparkMasterAddress leader_ip: If provided, IP of the Spark + leader node. Must be provided if run_local is false. + ''' + + # TODO: factor this out into a common lib + SPARK_MASTER_PORT=7077 + HDFS_MASTER_PORT=8020 + + # validate input arguments + require((run_local and leader_ip is None) or + (not run_local and leader_ip is not None), + "Either run_local ({0}) must be set or leader_ip ({1}) must not be None.".format(run_local, + leader_ip)) + require(not run_local or local_dir is not None, + "If run_local is set, local_dir must be set.") + + # are we running locally, or not? set up master configuration + if inputs.run_local: + master = ["--master", "local[*]"] + work_dir = local_dir + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT))] + work_dir = '.' + + # set default parameters + default_params = master + [ + "--conf", ("spark.driver.memory=%dg" % inputs.memory), + "--conf", ("spark.executor.memory=%dg" % inputs.memory), + "--conf", "spark.driver.maxResultSize=0", + # set max result size to unlimited, see #177 + "--"] + + # are we running via docker or natively? + if native_path is None: + docker_call(rm=False, + tool="quay.io/ucsc_cgl/quinine", + docker_parameters=master_ip.docker_parameters(['--net=host']), + parameters=(default_params + arguments), + work_dir=work_dir, + mock=False) + else: + check_call(["%s/bin/quinine-submit" % native_path] + + default_params + + arguments) + + +def call_quinine_rna_native(reads, + transcriptome, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute RNA-seq QC stats. + + :param str reads: Local path to input reads. + :param str transcriptome: Local path to transcriptome definition. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['rnaMetrics', + reads, transcriptome], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_hs_native(reads, + panel, + bait, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute stats for a hybrid-selection targeted sequencing panel. + + :param str reads: Local path to input reads. + :param str panel: Local path to definition of regions targeted by panel. + :param str bait: Local path to definition of regions tiled by bait. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['panelMetrics', + reads, panel, bait, + ], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_contamination_native(reads, + genotypes, + annotations, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str genotypes: Local path to genotypes. + :param str annotations: Local path to annotations. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['estimateContamination', + reads, genotypes, annotations], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_af_native(genotypes, + annotations, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str genotypes: Local path to genotypes. + :param str annotations: Local path to annotations. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['computeAlleleFrequency', + genotypes, annotations], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path)