From 958826b23901dfd7294a34d591acf29095bca533 Mon Sep 17 00:00:00 2001 From: Yash Patel <86321070+yashpatel6@users.noreply.github.com> Date: Fri, 1 Oct 2021 11:43:06 -0700 Subject: [PATCH] Add MarkDuplicatesSpark (#138) * Add markdup spark process to BWA-MEM2 workflow for testing * Allow for enabling/disabling use of Spark * Typo fix * Update changelog and readme * Removing indexing process, config update, readme update * Reverse Spark enabling option and update spark temp dir * Serialize the MarkDuplicatesSpark process * Add inline comments for completion signa * Update number of CPUs for markduplicatessparkt to 16 * Remove commented code, update formatting * Update completion signal name in HISAT aligner workflow * Update README to include MarkDuplicatesSpark * Update complete_signal comment * Move inline doc to module-level documentation --- CHANGELOG.md | 6 +- README.md | 6 +- pipeline/align-DNA.nf | 6 +- pipeline/config/execute.config | 6 +- pipeline/config/methods.config | 7 +-- pipeline/config/midmem.config | 6 +- pipeline/modules/align_DNA_BWA_MEM2.nf | 22 +++++-- pipeline/modules/align_DNA_HISAT2.nf | 21 +++++-- pipeline/modules/index_bam_picardtools.nf | 36 ----------- .../modules/mark_duplicate_picardtools.nf | 14 ++++- pipeline/modules/mark_duplicates_spark.nf | 63 +++++++++++++++++++ pipeline/modules/sort_bam_picardtools.nf | 4 +- pipeline/nextflow.example.config | 6 ++ 13 files changed, 137 insertions(+), 66 deletions(-) delete mode 100644 pipeline/modules/index_bam_picardtools.nf create mode 100644 pipeline/modules/mark_duplicates_spark.nf diff --git a/CHANGELOG.md b/CHANGELOG.md index d359d0f5..48da7e81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] ### Added -- GPL2 license added +- GPL2 License added +- MarkDuplicatesSpark process added as an option +### Changed +- Removed explicit index creation process and enabled option for MarkDuplicate process to create index +- Removed explicit maxForks allocation and instead allow CPU and memory allocation to dictate parallelization ## [7.1.0] - 2021-07-29 diff --git a/README.md b/README.md index 3e46d462..4288a1ab 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,9 @@ The next step of the pipeline utilizes Picard Tool’s SortSam command to sort t ## 3. Mark Duplicates in BAM Files -The next step of the pipeline utilizes Picard Tool’s MarkDuplicates command to mark duplicates in the BAM files, (see Tools and Infrastructure Section for details). The MarkDuplicates command utilizes the VALIDATION_STRINGENCY=LENIENT option to indicate how errors should be handled and keep the process running if possible. Additionally, the Program_Record_Id is set to “MarkDuplicates”. For more details regarding the specific command that was run please refer to the Source Code section +The next step of the pipeline utilizes Picard Tool’s MarkDuplicates command to mark duplicates in the BAM files, (see Tools and Infrastructure Section for details). The MarkDuplicates command utilizes the VALIDATION_STRINGENCY=LENIENT option to indicate how errors should be handled and keep the process running if possible. Additionally, the Program_Record_Id is set to “MarkDuplicates”. For more details regarding the specific command that was run please refer to the Source Code section. + +A faster Spark implementation of MarkDuplicates can also be used (MarkDuplicatesSpark from GATK). The process matches the output of Picard's MarkDuplicates with significant runtime improvements. An important note, however, the Spark version requires more disk space and can fail with large inputs with multiple aligners being specified due to insufficient disk space. In such cases, Picard's MarkDuplicates should be used instead. ## 4. Index BAM Files @@ -159,6 +161,8 @@ After marking dup BAM files, the BAM files are then indexed by utilizing Picard | `temp_dir` | yes | path | Absolute path to the directory where the nextflow's intermediate files are saved. If your cluster worker node has the `/scratch` set up, this can be set to it. | | `save_intermediate_files` | yes | boolean | Save intermediate files. If yes, not only the final BAM, but also the unmerged, unsorted, and duplicates unmarked BAM files will also be saved. | | `cache_intermediate_pipeline_steps` | yes | boolean | Enable cahcing to resume pipeline and the end of the last successful process completion when a pipeline fails (if true the default submission script must be modified). | +| `enable_spark` | yes | boolean | Enable use of Spark processes. When true, `MarkDuplicatesSpark` will be used. When false, `MarkDuplicates` will be used. Default value is true. | +| `spark_temp_dir` | yes | path | Path to temp dir for Spark processes. Defaults to `/scratch`. | | `max_number_of_parallel_jobs` | no | int | The maximum number of jobs or steps of the pipeline that can be ran in parallel. Default is 1. Be very cautious setting this to any value larger than 1, as it may cause out-of-memory error. It may be helpful when running on a big memory computing node. | | `bwa_mem_number_of_cpus` | no | int | Number of cores to use for BWA-MEM2. If not set, this will be calculated to ensure at least 2.5Gb memory per core. | | `blcds_registered_dataset_input` | yes | boolean | Input FASTQs are from the Boutros Lab data registry. | diff --git a/pipeline/align-DNA.nf b/pipeline/align-DNA.nf index 6d8cc014..4afae7f5 100644 --- a/pipeline/align-DNA.nf +++ b/pipeline/align-DNA.nf @@ -30,7 +30,6 @@ log.info """\ - options: save_intermediate_files = ${params.save_intermediate_files} cache_intermediate_pipeline_steps = ${params.cache_intermediate_pipeline_steps} - max_number_of_parallel_jobs = ${params.max_number_of_parallel_jobs} blcds_registered_dataset_input = ${params.blcds_registered_dataset_input} blcds_registered_dataset_output = ${params.blcds_registered_dataset_output} @@ -93,6 +92,10 @@ workflow { ich_reference_fasta_bwa, ich_bwa_reference_index_files ) + bwa_mem2_complete_signal = align_DNA_BWA_MEM2_workflow.out.complete_signal + } + else {// If only running HISAT2, generate dummy signal + bwa_mem2_complete_signal = "bwa_mem2_complete" } if (params.aligner.contains("HISAT2")) { Channel @@ -102,6 +105,7 @@ workflow { .fromPath(params.reference_fasta_index_files_hisat2, checkIfExists: true) .set { ich_hisat2_reference_index_files } align_DNA_HISAT2_workflow( + bwa_mem2_complete_signal, ich_samples, ich_samples_validate, ich_reference_fasta_hisat2, diff --git a/pipeline/config/execute.config b/pipeline/config/execute.config index a94e73cd..041c0b9b 100644 --- a/pipeline/config/execute.config +++ b/pipeline/config/execute.config @@ -21,9 +21,9 @@ process { cpus = 1 memory = 10.GB } - withName: run_BuildBamIndex_Picard { - cpus = 1 - memory = 4.GB + withName: run_MarkDuplicatesSpark_GATK { + cpus = 16 + memory = 35.GB } } diff --git a/pipeline/config/methods.config b/pipeline/config/methods.config index 7eea686d..316cefea 100644 --- a/pipeline/config/methods.config +++ b/pipeline/config/methods.config @@ -9,9 +9,6 @@ manifest { } params { - // resource configuraton for entire pipeline - max_number_of_parallel_jobs = 1 - // tools and their versions bwa_version = "BWA-MEM2-2.2.1" hisat2_version = "HISAT2-2.2.1" @@ -21,6 +18,7 @@ params { docker_image_picardtools = "blcdsdockerregistry/align-dna:picardtools-2.23.3" docker_image_sha512sum = "blcdsdockerregistry/align-dna:sha512sum-1.0" docker_image_validate_params = "blcdsdockerregistry/validate:2.1.5" + docker_image_gatk = "broadinstitute/gatk:4.2.2.0" } if (!params.containsKey('check_node_config')) { @@ -209,8 +207,7 @@ methods { set_process = { // monitor process jobs with local (not slurm) executor process.executor = "local" - // total amount of resources avaible to the pipeline - process.maxForks = params.max_number_of_parallel_jobs + // echo stdout of each step to stdout of pipeline process.echo = true process.cache = params.cache_intermediate_pipeline_steps diff --git a/pipeline/config/midmem.config b/pipeline/config/midmem.config index 00a5048e..7701379d 100644 --- a/pipeline/config/midmem.config +++ b/pipeline/config/midmem.config @@ -21,8 +21,8 @@ process { cpus = 1 memory = 10.GB } - withName: run_BuildBamIndex_Picard { - cpus = 1 - memory = 4.GB + withName: run_MarkDuplicatesSpark_GATK { + cpus = 16 + memory = 35.GB } } diff --git a/pipeline/modules/align_DNA_BWA_MEM2.nf b/pipeline/modules/align_DNA_BWA_MEM2.nf index dd53469c..b6785b84 100644 --- a/pipeline/modules/align_DNA_BWA_MEM2.nf +++ b/pipeline/modules/align_DNA_BWA_MEM2.nf @@ -6,7 +6,7 @@ include { run_validate; run_validate as validate_output_file } from './run_validate.nf' include { run_SortSam_Picard } from './sort_bam_picardtools.nf' include { run_MarkDuplicate_Picard } from './mark_duplicate_picardtools.nf' -include { run_BuildBamIndex_Picard } from './index_bam_picardtools.nf' +include { run_MarkDuplicatesSpark_GATK } from './mark_duplicates_spark.nf' include { Generate_Sha512sum } from './check_512sum.nf' process align_DNA_BWA_MEM2 { @@ -79,13 +79,23 @@ workflow align_DNA_BWA_MEM2_workflow { ich_reference_index_files.collect() ) run_SortSam_Picard(align_DNA_BWA_MEM2.out.bam, aligner_output_dir) - run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir) - run_BuildBamIndex_Picard(run_MarkDuplicate_Picard.out.bam, aligner_output_dir) - Generate_Sha512sum(run_BuildBamIndex_Picard.out.bai.mix(run_MarkDuplicate_Picard.out.bam), aligner_output_dir) + if (params.enable_spark) { + run_MarkDuplicatesSpark_GATK("completion_placeholder", run_SortSam_Picard.out.bam.collect(), aligner_output_dir) + och_markduplicates_bam = run_MarkDuplicatesSpark_GATK.out.bam + och_markduplicates_bam_index = run_MarkDuplicatesSpark_GATK.out.bam_index + } else { + run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir) + och_markduplicates_bam = run_MarkDuplicate_Picard.out.bam + och_markduplicates_bam_index = run_MarkDuplicate_Picard.out.bam_index + } + Generate_Sha512sum(och_markduplicates_bam_index.mix(och_markduplicates_bam), aligner_output_dir) validate_output_file( - run_MarkDuplicate_Picard.out.bam.mix( - run_BuildBamIndex_Picard.out.bai, + och_markduplicates_bam.mix( + och_markduplicates_bam_index, Channel.from(params.temp_dir, params.output_dir) ) ) + + emit: + complete_signal = och_markduplicates_bam.collect() } diff --git a/pipeline/modules/align_DNA_HISAT2.nf b/pipeline/modules/align_DNA_HISAT2.nf index ba503c6e..027890d7 100644 --- a/pipeline/modules/align_DNA_HISAT2.nf +++ b/pipeline/modules/align_DNA_HISAT2.nf @@ -6,7 +6,7 @@ include { run_validate; run_validate as validate_output_file } from './run_validate.nf' include { run_SortSam_Picard } from './sort_bam_picardtools.nf' include { run_MarkDuplicate_Picard } from './mark_duplicate_picardtools.nf' -include { run_BuildBamIndex_Picard } from './index_bam_picardtools.nf' +include { run_MarkDuplicatesSpark_GATK } from './mark_duplicates_spark.nf' include { Generate_Sha512sum } from './check_512sum.nf' process align_DNA_HISAT2 { @@ -67,6 +67,7 @@ process align_DNA_HISAT2 { workflow align_DNA_HISAT2_workflow { aligner_output_dir = "${params.bam_output_dir}/${params.hisat2_version}" take: + complete_signal //Output bam from previous MarkDuplicatesSpark process to ensure only one Spark process runs at a time ich_samples ich_samples_validate ich_reference_fasta @@ -82,12 +83,20 @@ workflow align_DNA_HISAT2_workflow { ich_reference_index_files.collect() ) run_SortSam_Picard(align_DNA_HISAT2.out.bam, aligner_output_dir) - run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir) - run_BuildBamIndex_Picard(run_MarkDuplicate_Picard.out.bam, aligner_output_dir) - Generate_Sha512sum(run_BuildBamIndex_Picard.out.bai.mix(run_MarkDuplicate_Picard.out.bam), aligner_output_dir) + if (params.enable_spark) { + //Run MarkduplicatesSpark only after BWA-MEM2 markduplicatesspark completes + run_MarkDuplicatesSpark_GATK(complete_signal, run_SortSam_Picard.out.bam.collect(), aligner_output_dir) + och_markduplicates_bam = run_MarkDuplicatesSpark_GATK.out.bam + och_markduplicates_bam_index = run_MarkDuplicatesSpark_GATK.out.bam_index + } else { + run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir) + och_markduplicates_bam = run_MarkDuplicate_Picard.out.bam + och_markduplicates_bam_index = run_MarkDuplicate_Picard.out.bam_index + } + Generate_Sha512sum(och_markduplicates_bam_index.mix(och_markduplicates_bam), aligner_output_dir) validate_output_file( - run_MarkDuplicate_Picard.out.bam.mix( - run_BuildBamIndex_Picard.out.bai, + och_markduplicates_bam.mix( + och_markduplicates_bam_index, Channel.from(params.temp_dir, params.output_dir) ) ) diff --git a/pipeline/modules/index_bam_picardtools.nf b/pipeline/modules/index_bam_picardtools.nf deleted file mode 100644 index f37095af..00000000 --- a/pipeline/modules/index_bam_picardtools.nf +++ /dev/null @@ -1,36 +0,0 @@ - -// index bams with picard -process run_BuildBamIndex_Picard { - container params.docker_image_picardtools - containerOptions "--volume ${params.temp_dir}:/temp_dir" - - publishDir path: "${bam_output_dir}", - pattern: "*.bam.bai", - mode: 'copy' - - publishDir path: "${params.log_output_dir}/${task.process.replace(':', '/')}", - pattern: ".command.*", - mode: "copy", - saveAs: { "log${file(it).getName()}" } - - input: - path(input_bam) - val(bam_output_dir) - - // no need for an output channel becuase this is the final stepp - output: - path "${input_bam.getName()}.bai", emit: bai - path(".command.*") - - script: - """ - set -euo pipefail - - java -Xmx${params.mem_command_build_bam_index} -Djava.io.tmpdir=/temp_dir \ - -jar /picard-tools/picard.jar \ - BuildBamIndex \ - --VALIDATION_STRINGENCY LENIENT \ - --INPUT ${input_bam} \ - --OUTPUT ${input_bam.getName()}.bai - """ - } diff --git a/pipeline/modules/mark_duplicate_picardtools.nf b/pipeline/modules/mark_duplicate_picardtools.nf index b10f69d8..0862b208 100644 --- a/pipeline/modules/mark_duplicate_picardtools.nf +++ b/pipeline/modules/mark_duplicate_picardtools.nf @@ -1,11 +1,16 @@ // mark duplicates with picard -process run_MarkDuplicate_Picard { +process run_MarkDuplicate_Picard { container params.docker_image_picardtools containerOptions "--volume ${params.temp_dir}:/temp_dir" publishDir path: "${bam_output_dir}", - pattern: "*.bam", + pattern: "*.{bam,bai}", + mode: 'copy' + + publishDir path: "${bam_output_dir}", + pattern: "*.metrics", + enabled: params.save_intermediate_files, mode: 'copy' publishDir path: "${params.log_output_dir}/${task.process.replace(':', '/')}", @@ -21,6 +26,8 @@ process run_MarkDuplicate_Picard { // just the sample name (global variable), do not pass it as a val output: path bam_output_filename, emit: bam + path "*.bai", emit: bam_index + path "${params.sample_name}.mark_dup.metrics" path(".command.*") shell: @@ -39,6 +46,7 @@ process run_MarkDuplicate_Picard { --OUTPUT !{bam_output_filename} \ --METRICS_FILE !{params.sample_name}.mark_dup.metrics \ --ASSUME_SORT_ORDER coordinate \ - --PROGRAM_RECORD_ID MarkDuplicates + --PROGRAM_RECORD_ID MarkDuplicates \ + --CREATE_INDEX true ''' } diff --git a/pipeline/modules/mark_duplicates_spark.nf b/pipeline/modules/mark_duplicates_spark.nf new file mode 100644 index 00000000..653b7101 --- /dev/null +++ b/pipeline/modules/mark_duplicates_spark.nf @@ -0,0 +1,63 @@ +/* + Nextflow module for marking duplicates using Spark. + The containerOptions specifying user 'nobody' allow for Spark to be run without root access. + The beforeScript command allows the user 'nobody' to create the output files into the working directory. + + Input: + completion_signal: output bam from previous markduplicatesspark process to ensure + only one spark process runs at a time +*/ +process run_MarkDuplicatesSpark_GATK { + container params.docker_image_gatk + containerOptions "--volume ${params.temp_dir}:/temp_dir --volume ${params.spark_temp_dir}:/spark_temp_dir -u nobody" + + publishDir path: "${bam_output_dir}", + pattern: "*.bam{,.bai}", + mode: 'copy' + + publishDir path: "${bam_output_dir}", + pattern: "*.metrics", + enabled: params.save_intermediate_files, + mode: 'copy' + + publishDir path: "${params.log_output_dir}/${task.process.replace(':', '/')}", + pattern: ".command.*", + mode: "copy", + saveAs: { "log${file(it).getName()}" } + + input: + val(completion_signal) + path(input_bams) + val(bam_output_dir) + + // after marking duplicates, bams will be merged by library so the library name is not needed + // just the sample name (global variable), do not pass it as a val + output: + path bam_output_filename, emit: bam + path "*.bai", emit: bam_index + path "${params.sample_name}.mark_dup.metrics" + path(".command.*") + + beforeScript 'chmod 777 `pwd`' + + shell: + bam_output_filename = "${params.bam_output_filename}" + ''' + set -euo pipefail + + # add gatk option prefix, '--input' to each input bam + declare -r INPUT=$(echo '!{input_bams}' | sed -e 's/ / --input /g' | sed '1s/^/--input /') + + gatk --java-options "-Djava.io.tmpdir=/temp_dir" \ + MarkDuplicatesSpark \ + --read-validation-stringency LENIENT \ + $INPUT \ + --output !{bam_output_filename} \ + --metrics-file !{params.sample_name}.mark_dup.metrics \ + --program-name MarkDuplicatesSpark \ + --create-output-bam-index \ + --conf 'spark.executor.cores=${task.cpus}' \ + --conf 'spark.local.dir=/spark_temp_dir' \ + --tmp-dir /temp_dir + ''' + } diff --git a/pipeline/modules/sort_bam_picardtools.nf b/pipeline/modules/sort_bam_picardtools.nf index 24a24243..592b6217 100644 --- a/pipeline/modules/sort_bam_picardtools.nf +++ b/pipeline/modules/sort_bam_picardtools.nf @@ -31,6 +31,8 @@ process run_SortSam_Picard { path(".command.*") script: + // Determine sort order based on markduplicates process: queryname for spark and coordinate for Picard + sort_order = (params.enable_spark) ? "queryname" : "coordinate" """ set -euo pipefail @@ -40,6 +42,6 @@ process run_SortSam_Picard { --VALIDATION_STRINGENCY LENIENT \ --INPUT ${input_bam} \ --OUTPUT ${library}-${lane}.sorted.bam \ - --SORT_ORDER coordinate + --SORT_ORDER ${sort_order} """ } diff --git a/pipeline/nextflow.example.config b/pipeline/nextflow.example.config index 8f9ab00d..bd781d52 100644 --- a/pipeline/nextflow.example.config +++ b/pipeline/nextflow.example.config @@ -26,6 +26,12 @@ params { save_intermediate_files = false cache_intermediate_pipeline_steps = false + // Spark options + // By default, the Spark process MarkDuplicatesSpark will be used. Set to false to disable Spark process and use MarkDuplicates (Picard) instead + enable_spark = true + // Default Spark temp dir is /scratch. Update if necessary + spark_temp_dir = "/scratch" + // set to true if the data input fastq files are registered in the Boutros Lab. blcds_registered_dataset_input = false // set to true to redirect output files directly to the Boutros Lab data storage.