Skip to content

Commit

Permalink
Add MarkDuplicatesSpark (#138)
Browse files Browse the repository at this point in the history
* Add markdup spark process to BWA-MEM2 workflow for testing

* Allow for enabling/disabling use of Spark

* Typo fix

* Update changelog and readme

* Removing indexing process, config update, readme update

* Reverse Spark enabling option and update spark temp dir

* Serialize the MarkDuplicatesSpark process

* Add inline comments for completion signa

* Update number of CPUs for markduplicatessparkt to 16

* Remove commented code, update formatting

* Update completion signal name in HISAT aligner workflow

* Update README to include MarkDuplicatesSpark

* Update complete_signal comment

* Move inline doc to module-level documentation
  • Loading branch information
yashpatel6 authored Oct 1, 2021
1 parent 17e600f commit 958826b
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 66 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]
### Added
- GPL2 license added
- GPL2 License added
- MarkDuplicatesSpark process added as an option
### Changed
- Removed explicit index creation process and enabled option for MarkDuplicate process to create index
- Removed explicit maxForks allocation and instead allow CPU and memory allocation to dictate parallelization


## [7.1.0] - 2021-07-29
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ The next step of the pipeline utilizes Picard Tool’s SortSam command to sort t

## 3. Mark Duplicates in BAM Files

The next step of the pipeline utilizes Picard Tool’s MarkDuplicates command to mark duplicates in the BAM files, (see Tools and Infrastructure Section for details). The MarkDuplicates command utilizes the VALIDATION_STRINGENCY=LENIENT option to indicate how errors should be handled and keep the process running if possible. Additionally, the Program_Record_Id is set to “MarkDuplicates”. For more details regarding the specific command that was run please refer to the Source Code section
The next step of the pipeline utilizes Picard Tool’s MarkDuplicates command to mark duplicates in the BAM files, (see Tools and Infrastructure Section for details). The MarkDuplicates command utilizes the VALIDATION_STRINGENCY=LENIENT option to indicate how errors should be handled and keep the process running if possible. Additionally, the Program_Record_Id is set to “MarkDuplicates”. For more details regarding the specific command that was run please refer to the Source Code section.

A faster Spark implementation of MarkDuplicates can also be used (MarkDuplicatesSpark from GATK). The process matches the output of Picard's MarkDuplicates with significant runtime improvements. An important note, however, the Spark version requires more disk space and can fail with large inputs with multiple aligners being specified due to insufficient disk space. In such cases, Picard's MarkDuplicates should be used instead.

## 4. Index BAM Files

Expand Down Expand Up @@ -159,6 +161,8 @@ After marking dup BAM files, the BAM files are then indexed by utilizing Picard
| `temp_dir` | yes | path | Absolute path to the directory where the nextflow's intermediate files are saved. If your cluster worker node has the `/scratch` set up, this can be set to it. |
| `save_intermediate_files` | yes | boolean | Save intermediate files. If yes, not only the final BAM, but also the unmerged, unsorted, and duplicates unmarked BAM files will also be saved. |
| `cache_intermediate_pipeline_steps` | yes | boolean | Enable cahcing to resume pipeline and the end of the last successful process completion when a pipeline fails (if true the default submission script must be modified). |
| `enable_spark` | yes | boolean | Enable use of Spark processes. When true, `MarkDuplicatesSpark` will be used. When false, `MarkDuplicates` will be used. Default value is true. |
| `spark_temp_dir` | yes | path | Path to temp dir for Spark processes. Defaults to `/scratch`. |
| `max_number_of_parallel_jobs` | no | int | The maximum number of jobs or steps of the pipeline that can be ran in parallel. Default is 1. Be very cautious setting this to any value larger than 1, as it may cause out-of-memory error. It may be helpful when running on a big memory computing node. |
| `bwa_mem_number_of_cpus` | no | int | Number of cores to use for BWA-MEM2. If not set, this will be calculated to ensure at least 2.5Gb memory per core. |
| `blcds_registered_dataset_input` | yes | boolean | Input FASTQs are from the Boutros Lab data registry. |
Expand Down
6 changes: 5 additions & 1 deletion pipeline/align-DNA.nf
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ log.info """\
- options:
save_intermediate_files = ${params.save_intermediate_files}
cache_intermediate_pipeline_steps = ${params.cache_intermediate_pipeline_steps}
max_number_of_parallel_jobs = ${params.max_number_of_parallel_jobs}
blcds_registered_dataset_input = ${params.blcds_registered_dataset_input}
blcds_registered_dataset_output = ${params.blcds_registered_dataset_output}
Expand Down Expand Up @@ -93,6 +92,10 @@ workflow {
ich_reference_fasta_bwa,
ich_bwa_reference_index_files
)
bwa_mem2_complete_signal = align_DNA_BWA_MEM2_workflow.out.complete_signal
}
else {// If only running HISAT2, generate dummy signal
bwa_mem2_complete_signal = "bwa_mem2_complete"
}
if (params.aligner.contains("HISAT2")) {
Channel
Expand All @@ -102,6 +105,7 @@ workflow {
.fromPath(params.reference_fasta_index_files_hisat2, checkIfExists: true)
.set { ich_hisat2_reference_index_files }
align_DNA_HISAT2_workflow(
bwa_mem2_complete_signal,
ich_samples,
ich_samples_validate,
ich_reference_fasta_hisat2,
Expand Down
6 changes: 3 additions & 3 deletions pipeline/config/execute.config
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ process {
cpus = 1
memory = 10.GB
}
withName: run_BuildBamIndex_Picard {
cpus = 1
memory = 4.GB
withName: run_MarkDuplicatesSpark_GATK {
cpus = 16
memory = 35.GB
}
}

7 changes: 2 additions & 5 deletions pipeline/config/methods.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ manifest {
}

params {
// resource configuraton for entire pipeline
max_number_of_parallel_jobs = 1

// tools and their versions
bwa_version = "BWA-MEM2-2.2.1"
hisat2_version = "HISAT2-2.2.1"
Expand All @@ -21,6 +18,7 @@ params {
docker_image_picardtools = "blcdsdockerregistry/align-dna:picardtools-2.23.3"
docker_image_sha512sum = "blcdsdockerregistry/align-dna:sha512sum-1.0"
docker_image_validate_params = "blcdsdockerregistry/validate:2.1.5"
docker_image_gatk = "broadinstitute/gatk:4.2.2.0"
}

if (!params.containsKey('check_node_config')) {
Expand Down Expand Up @@ -209,8 +207,7 @@ methods {
set_process = {
// monitor process jobs with local (not slurm) executor
process.executor = "local"
// total amount of resources avaible to the pipeline
process.maxForks = params.max_number_of_parallel_jobs

// echo stdout of each step to stdout of pipeline
process.echo = true
process.cache = params.cache_intermediate_pipeline_steps
Expand Down
6 changes: 3 additions & 3 deletions pipeline/config/midmem.config
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ process {
cpus = 1
memory = 10.GB
}
withName: run_BuildBamIndex_Picard {
cpus = 1
memory = 4.GB
withName: run_MarkDuplicatesSpark_GATK {
cpus = 16
memory = 35.GB
}
}
22 changes: 16 additions & 6 deletions pipeline/modules/align_DNA_BWA_MEM2.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
include { run_validate; run_validate as validate_output_file } from './run_validate.nf'
include { run_SortSam_Picard } from './sort_bam_picardtools.nf'
include { run_MarkDuplicate_Picard } from './mark_duplicate_picardtools.nf'
include { run_BuildBamIndex_Picard } from './index_bam_picardtools.nf'
include { run_MarkDuplicatesSpark_GATK } from './mark_duplicates_spark.nf'
include { Generate_Sha512sum } from './check_512sum.nf'

process align_DNA_BWA_MEM2 {
Expand Down Expand Up @@ -79,13 +79,23 @@ workflow align_DNA_BWA_MEM2_workflow {
ich_reference_index_files.collect()
)
run_SortSam_Picard(align_DNA_BWA_MEM2.out.bam, aligner_output_dir)
run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
run_BuildBamIndex_Picard(run_MarkDuplicate_Picard.out.bam, aligner_output_dir)
Generate_Sha512sum(run_BuildBamIndex_Picard.out.bai.mix(run_MarkDuplicate_Picard.out.bam), aligner_output_dir)
if (params.enable_spark) {
run_MarkDuplicatesSpark_GATK("completion_placeholder", run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
och_markduplicates_bam = run_MarkDuplicatesSpark_GATK.out.bam
och_markduplicates_bam_index = run_MarkDuplicatesSpark_GATK.out.bam_index
} else {
run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
och_markduplicates_bam = run_MarkDuplicate_Picard.out.bam
och_markduplicates_bam_index = run_MarkDuplicate_Picard.out.bam_index
}
Generate_Sha512sum(och_markduplicates_bam_index.mix(och_markduplicates_bam), aligner_output_dir)
validate_output_file(
run_MarkDuplicate_Picard.out.bam.mix(
run_BuildBamIndex_Picard.out.bai,
och_markduplicates_bam.mix(
och_markduplicates_bam_index,
Channel.from(params.temp_dir, params.output_dir)
)
)

emit:
complete_signal = och_markduplicates_bam.collect()
}
21 changes: 15 additions & 6 deletions pipeline/modules/align_DNA_HISAT2.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
include { run_validate; run_validate as validate_output_file } from './run_validate.nf'
include { run_SortSam_Picard } from './sort_bam_picardtools.nf'
include { run_MarkDuplicate_Picard } from './mark_duplicate_picardtools.nf'
include { run_BuildBamIndex_Picard } from './index_bam_picardtools.nf'
include { run_MarkDuplicatesSpark_GATK } from './mark_duplicates_spark.nf'
include { Generate_Sha512sum } from './check_512sum.nf'

process align_DNA_HISAT2 {
Expand Down Expand Up @@ -67,6 +67,7 @@ process align_DNA_HISAT2 {
workflow align_DNA_HISAT2_workflow {
aligner_output_dir = "${params.bam_output_dir}/${params.hisat2_version}"
take:
complete_signal //Output bam from previous MarkDuplicatesSpark process to ensure only one Spark process runs at a time
ich_samples
ich_samples_validate
ich_reference_fasta
Expand All @@ -82,12 +83,20 @@ workflow align_DNA_HISAT2_workflow {
ich_reference_index_files.collect()
)
run_SortSam_Picard(align_DNA_HISAT2.out.bam, aligner_output_dir)
run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
run_BuildBamIndex_Picard(run_MarkDuplicate_Picard.out.bam, aligner_output_dir)
Generate_Sha512sum(run_BuildBamIndex_Picard.out.bai.mix(run_MarkDuplicate_Picard.out.bam), aligner_output_dir)
if (params.enable_spark) {
//Run MarkduplicatesSpark only after BWA-MEM2 markduplicatesspark completes
run_MarkDuplicatesSpark_GATK(complete_signal, run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
och_markduplicates_bam = run_MarkDuplicatesSpark_GATK.out.bam
och_markduplicates_bam_index = run_MarkDuplicatesSpark_GATK.out.bam_index
} else {
run_MarkDuplicate_Picard(run_SortSam_Picard.out.bam.collect(), aligner_output_dir)
och_markduplicates_bam = run_MarkDuplicate_Picard.out.bam
och_markduplicates_bam_index = run_MarkDuplicate_Picard.out.bam_index
}
Generate_Sha512sum(och_markduplicates_bam_index.mix(och_markduplicates_bam), aligner_output_dir)
validate_output_file(
run_MarkDuplicate_Picard.out.bam.mix(
run_BuildBamIndex_Picard.out.bai,
och_markduplicates_bam.mix(
och_markduplicates_bam_index,
Channel.from(params.temp_dir, params.output_dir)
)
)
Expand Down
36 changes: 0 additions & 36 deletions pipeline/modules/index_bam_picardtools.nf

This file was deleted.

14 changes: 11 additions & 3 deletions pipeline/modules/mark_duplicate_picardtools.nf
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@

// mark duplicates with picard
process run_MarkDuplicate_Picard {
process run_MarkDuplicate_Picard {
container params.docker_image_picardtools
containerOptions "--volume ${params.temp_dir}:/temp_dir"

publishDir path: "${bam_output_dir}",
pattern: "*.bam",
pattern: "*.{bam,bai}",
mode: 'copy'

publishDir path: "${bam_output_dir}",
pattern: "*.metrics",
enabled: params.save_intermediate_files,
mode: 'copy'

publishDir path: "${params.log_output_dir}/${task.process.replace(':', '/')}",
Expand All @@ -21,6 +26,8 @@ process run_MarkDuplicate_Picard {
// just the sample name (global variable), do not pass it as a val
output:
path bam_output_filename, emit: bam
path "*.bai", emit: bam_index
path "${params.sample_name}.mark_dup.metrics"
path(".command.*")

shell:
Expand All @@ -39,6 +46,7 @@ process run_MarkDuplicate_Picard {
--OUTPUT !{bam_output_filename} \
--METRICS_FILE !{params.sample_name}.mark_dup.metrics \
--ASSUME_SORT_ORDER coordinate \
--PROGRAM_RECORD_ID MarkDuplicates
--PROGRAM_RECORD_ID MarkDuplicates \
--CREATE_INDEX true
'''
}
63 changes: 63 additions & 0 deletions pipeline/modules/mark_duplicates_spark.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Nextflow module for marking duplicates using Spark.
The containerOptions specifying user 'nobody' allow for Spark to be run without root access.
The beforeScript command allows the user 'nobody' to create the output files into the working directory.
Input:
completion_signal: output bam from previous markduplicatesspark process to ensure
only one spark process runs at a time
*/
process run_MarkDuplicatesSpark_GATK {
container params.docker_image_gatk
containerOptions "--volume ${params.temp_dir}:/temp_dir --volume ${params.spark_temp_dir}:/spark_temp_dir -u nobody"

publishDir path: "${bam_output_dir}",
pattern: "*.bam{,.bai}",
mode: 'copy'

publishDir path: "${bam_output_dir}",
pattern: "*.metrics",
enabled: params.save_intermediate_files,
mode: 'copy'

publishDir path: "${params.log_output_dir}/${task.process.replace(':', '/')}",
pattern: ".command.*",
mode: "copy",
saveAs: { "log${file(it).getName()}" }

input:
val(completion_signal)
path(input_bams)
val(bam_output_dir)

// after marking duplicates, bams will be merged by library so the library name is not needed
// just the sample name (global variable), do not pass it as a val
output:
path bam_output_filename, emit: bam
path "*.bai", emit: bam_index
path "${params.sample_name}.mark_dup.metrics"
path(".command.*")

beforeScript 'chmod 777 `pwd`'

shell:
bam_output_filename = "${params.bam_output_filename}"
'''
set -euo pipefail
# add gatk option prefix, '--input' to each input bam
declare -r INPUT=$(echo '!{input_bams}' | sed -e 's/ / --input /g' | sed '1s/^/--input /')
gatk --java-options "-Djava.io.tmpdir=/temp_dir" \
MarkDuplicatesSpark \
--read-validation-stringency LENIENT \
$INPUT \
--output !{bam_output_filename} \
--metrics-file !{params.sample_name}.mark_dup.metrics \
--program-name MarkDuplicatesSpark \
--create-output-bam-index \
--conf 'spark.executor.cores=${task.cpus}' \
--conf 'spark.local.dir=/spark_temp_dir' \
--tmp-dir /temp_dir
'''
}
4 changes: 3 additions & 1 deletion pipeline/modules/sort_bam_picardtools.nf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ process run_SortSam_Picard {
path(".command.*")

script:
// Determine sort order based on markduplicates process: queryname for spark and coordinate for Picard
sort_order = (params.enable_spark) ? "queryname" : "coordinate"
"""
set -euo pipefail
Expand All @@ -40,6 +42,6 @@ process run_SortSam_Picard {
--VALIDATION_STRINGENCY LENIENT \
--INPUT ${input_bam} \
--OUTPUT ${library}-${lane}.sorted.bam \
--SORT_ORDER coordinate
--SORT_ORDER ${sort_order}
"""
}
6 changes: 6 additions & 0 deletions pipeline/nextflow.example.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ params {
save_intermediate_files = false
cache_intermediate_pipeline_steps = false

// Spark options
// By default, the Spark process MarkDuplicatesSpark will be used. Set to false to disable Spark process and use MarkDuplicates (Picard) instead
enable_spark = true
// Default Spark temp dir is /scratch. Update if necessary
spark_temp_dir = "/scratch"

// set to true if the data input fastq files are registered in the Boutros Lab.
blcds_registered_dataset_input = false
// set to true to redirect output files directly to the Boutros Lab data storage.
Expand Down

0 comments on commit 958826b

Please sign in to comment.