Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modification from NF Language server #1779

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- [1767](https://github.com/nf-core/sarek/pull/1767) - Bump nf-schema version to 2.2.1
- [1779](https://github.com/nf-core/sarek/pull/1779) - Code modifications from the Nextflow language server

### Fixed

320 changes: 169 additions & 151 deletions main.nf

Large diffs are not rendered by default.

30 changes: 17 additions & 13 deletions subworkflows/local/bam_baserecalibrator/main.nf
Original file line number Diff line number Diff line change
@@ -21,34 +21,38 @@ workflow BAM_BASERECALIBRATOR {
versions = Channel.empty()

// Combine cram and intervals for spread and gather strategy
cram_intervals = cram.combine(intervals)
// Move num_intervals to meta map
.map{ meta, cram, crai, intervals, num_intervals -> [ meta + [ num_intervals:num_intervals ], cram, crai, intervals ] }
// Move num_intervals to meta map
cram_intervals = cram
.combine(intervals)
.map { meta, cram_, crai_, intervals_, num_intervals -> [meta + [num_intervals: num_intervals], cram_, crai_, intervals_] }

// RUN BASERECALIBRATOR
GATK4_BASERECALIBRATOR(cram_intervals, fasta.map{ meta, it -> [ it ] }, fasta_fai.map{ meta, it -> [ it ] }, dict.map{ meta, it -> [ it ] }, known_sites, known_sites_tbi)
GATK4_BASERECALIBRATOR(cram_intervals, fasta.map { _meta, fasta_ -> [fasta_] }, fasta_fai.map { _meta, fasta_fai_ -> [fasta_fai_] }, dict.map { _meta, dict_ -> [dict_] }, known_sites, known_sites_tbi)

// Figuring out if there is one or more table(s) from the same sample
table_to_merge = GATK4_BASERECALIBRATOR.out.table.map{ meta, table -> [ groupKey(meta, meta.num_intervals), table ] }.groupTuple().branch{
// Use meta.num_intervals to asses number of intervals
single: it[0].num_intervals <= 1
multiple: it[0].num_intervals > 1
}
// Use meta.num_intervals to asses number of intervals
table_to_merge = GATK4_BASERECALIBRATOR.out.table
.map { meta, table -> [groupKey(meta, meta.num_intervals), table] }
.groupTuple()
.branch {
single: it[0].num_intervals <= 1
multiple: it[0].num_intervals > 1
}

// Only when using intervals
GATK4_GATHERBQSRREPORTS(table_to_merge.multiple)

// Mix intervals and no_intervals channels together
table_bqsr = GATK4_GATHERBQSRREPORTS.out.table.mix(table_to_merge.single.map{ meta, table -> [ meta, table[0] ] })
// Remove no longer necessary field: num_intervals
.map{ meta, table -> [ meta - meta.subMap('num_intervals'), table ] }
// Remove no longer necessary field: num_intervals
table_bqsr = GATK4_GATHERBQSRREPORTS.out.table
.mix(table_to_merge.single.map { meta, table -> [meta, table[0]] })
.map { meta, table -> [meta - meta.subMap('num_intervals'), table] }

// Gather versions of all tools used
versions = versions.mix(GATK4_BASERECALIBRATOR.out.versions)
versions = versions.mix(GATK4_GATHERBQSRREPORTS.out.versions)

emit:
table_bqsr // channel: [ meta, table ]

versions // channel: [ versions.yml ]
}
120 changes: 67 additions & 53 deletions subworkflows/local/prepare_intervals/main.nf
Original file line number Diff line number Diff line change
@@ -14,43 +14,46 @@ include { TABIX_BGZIPTABIX as TABIX_BGZIPTABIX_INTERVAL_COMBINED } from '../../.

workflow PREPARE_INTERVALS {
take:
fasta_fai // mandatory [ fasta_fai ]
intervals // [ params.intervals ]
no_intervals // [ params.no_intervals ]
nucleotides_per_second
outdir
step
fasta_fai // mandatory [ params.fasta_fai ]
intervals // optional [ params.intervals ]
no_intervals // boolean [ params.no_intervals ]
nucleotides_per_second // mandatory [ params.nucleotides_per_second ]
outdir // mandatory [ params.outdir ]
step // mandatory [ params.step ]

main:
versions = Channel.empty()

intervals_bed = Channel.empty() // List of [ bed, num_intervals ], one for each region
intervals_bed_gz_tbi = Channel.empty() // List of [ bed.gz, bed,gz.tbi, num_intervals ], one for each region
intervals_combined = Channel.empty() // Single bed file containing all intervals
// intervals_bed - List of [ bed, num_intervals ], one per region
// intervals_bed_gz_tbi - List of [ bed.gz, bed,gz.tbi, num_intervals ], one per region
// intervals_bed_combined - Single bed file containing all intervals
intervals_bed = Channel.empty()
intervals_bed_gz_tbi = Channel.empty()
intervals_bed_combined = Channel.empty()

if (no_intervals) {
file("${outdir}/no_intervals.bed").text = "no_intervals\n"
file("${outdir}/no_intervals.bed.gz").text = "no_intervals\n"
file("${outdir}/no_intervals.bed").text = "no_intervals\n"
file("${outdir}/no_intervals.bed.gz").text = "no_intervals\n"
file("${outdir}/no_intervals.bed.gz.tbi").text = "no_intervals\n"

intervals_bed = Channel.fromPath(file("${outdir}/no_intervals.bed")).map{ it -> [ it, 0 ] }
intervals_bed_gz_tbi = Channel.fromPath(file("${outdir}/no_intervals.bed.{gz,gz.tbi}")).collect().map{ it -> [ it, 0 ] }
intervals_combined = Channel.fromPath(file("${outdir}/no_intervals.bed")).map{ it -> [ [ id:it.simpleName ], it ] }
} else if (step != 'annotate' && step != 'controlfreec') {
intervals_bed = Channel.fromPath(file("${outdir}/no_intervals.bed")).map { it -> [it, 0] }
intervals_bed_gz_tbi = Channel.fromPath(file("${outdir}/no_intervals.bed.{gz,gz.tbi}")).collect().map { it -> [it, 0] }
intervals_bed_combined = Channel.fromPath(file("${outdir}/no_intervals.bed")).map { it -> [[id: it.simpleName], it] }
}
else if (step != 'annotate' && step != 'controlfreec') {
// If no interval/target file is provided, then generated intervals from FASTA file
if (!intervals) {
BUILD_INTERVALS(fasta_fai, [])

intervals_combined = BUILD_INTERVALS.out.output

CREATE_INTERVALS_BED(intervals_combined.map{ meta, path -> path }, nucleotides_per_second)
CREATE_INTERVALS_BED(BUILD_INTERVALS.out.output.map { _meta, intervals_ -> intervals_ }, nucleotides_per_second)

intervals_bed = CREATE_INTERVALS_BED.out.bed

versions = versions.mix(BUILD_INTERVALS.out.versions)
versions = versions.mix(CREATE_INTERVALS_BED.out.versions)
} else {
intervals_combined = Channel.fromPath(file(intervals)).map{it -> [ [ id:it.baseName ], it ] }
}
else {
intervals_bed_combined = Channel.fromPath(file(intervals)).map { intervals_ -> [[id: intervals_.baseName], intervals_] }
CREATE_INTERVALS_BED(file(intervals), nucleotides_per_second)

intervals_bed = CREATE_INTERVALS_BED.out.bed
@@ -59,60 +62,71 @@ workflow PREPARE_INTERVALS {

// If interval file is not provided as .bed, but e.g. as .interval_list then convert to BED format
if (intervals.endsWith(".interval_list")) {
GATK4_INTERVALLISTTOBED(intervals_combined)
intervals_combined = GATK4_INTERVALLISTTOBED.out.bed
GATK4_INTERVALLISTTOBED(intervals_bed_combined)
intervals_bed_combined = GATK4_INTERVALLISTTOBED.out.bed
versions = versions.mix(GATK4_INTERVALLISTTOBED.out.versions)
}
}

// Now for the intervals.bed the following operations are done:
// 1. Intervals file is split up into multiple bed files for scatter/gather
// 2. Each bed file is indexed

// 1. Intervals file is split up into multiple bed files for scatter/gather & grouping together small intervals
intervals_bed = intervals_bed.flatten()
.map{ intervalFile ->
// 1/ Split up intervals bed file into multiple bed files for scatter/gather
// 2/ Tabix index each bed file

// 1/ Split up intervals bed file into multiple bed files for scatter/gather
// Also group together small intervals
// And add the number of intervals to the channel
intervals_bed = intervals_bed
.flatten()
.map { intervals_ ->
def duration = 0.0
for (line in intervalFile.readLines()) {
final fields = line.split('\t')
if (fields.size() >= 5) duration += fields[4].toFloat()
else {
start = fields[1].toInteger()
end = fields[2].toInteger()
duration += (end - start) / nucleotides_per_second
intervals_
.readLines()
.each { line ->
def fields = line.split('\t')
if (fields.size() >= 5) {
duration += fields[4].toFloat()
}
else {
def start = fields[1].toInteger()
def end = fields[2].toInteger()
duration += (end - start) / nucleotides_per_second
}
}
}
[ duration, intervalFile ]
}.toSortedList({ a, b -> b[0] <=> a[0] })
.flatten().collate(2).map{ duration, intervalFile -> intervalFile }.collect()
// Adding number of intervals as elements
.map{ it -> [ it, it.size() ] }
[duration, intervals_]
}
.toSortedList { a, b -> b[0] <=> a[0] }
.flatten()
.collate(2)
.map { _duration, intervals_ -> intervals_ }
.collect()
.map { intervals_ -> [intervals_, intervals_.size()] }
.transpose()

// 2. Create bed.gz and bed.gz.tbi for each interval file. They are split by region (see above)
TABIX_BGZIPTABIX_INTERVAL_SPLIT(intervals_bed.map{ file, num_intervals -> [ [ id:file.baseName], file ] })
// 2/ Tabix index each bed file
TABIX_BGZIPTABIX_INTERVAL_SPLIT(intervals_bed.map { intervals_, _num_intervals -> [[id: intervals_.baseName], intervals_] })

intervals_bed_gz_tbi = TABIX_BGZIPTABIX_INTERVAL_SPLIT.out.gz_tbi.map{ meta, bed, tbi -> [ bed, tbi ] }.toList()
// Adding number of intervals as elements
.map{ it -> [ it, it.size() ] }
intervals_bed_gz_tbi = TABIX_BGZIPTABIX_INTERVAL_SPLIT.out.gz_tbi
.map { _meta, intervals_gz_, intervals_gz_tbi_ -> [intervals_gz_, intervals_gz_tbi_] }
.toList()
.map { it -> [it, it.size()] }
.transpose()

versions = versions.mix(TABIX_BGZIPTABIX_INTERVAL_SPLIT.out.versions)
}

TABIX_BGZIPTABIX_INTERVAL_COMBINED(intervals_combined)
TABIX_BGZIPTABIX_INTERVAL_COMBINED(intervals_bed_combined)
versions = versions.mix(TABIX_BGZIPTABIX_INTERVAL_COMBINED.out.versions)

intervals_bed_combined = intervals_combined.map{meta, bed -> bed }.collect()
intervals_bed_gz_tbi_combined = TABIX_BGZIPTABIX_INTERVAL_COMBINED.out.gz_tbi.map{meta, gz, tbi -> [gz, tbi] }.collect()
// intervals_bed and intervals_bed_gz_tbi are the intervals split for parallel execution, and contain the number of intervals
// intervals_bed_combined and intervals_bed_gz_tbi_combined are all intervals collected in one file

intervals_bed_combined = intervals_bed_combined.map { _meta, intervals_ -> intervals_ }.collect()
intervals_bed_gz_tbi_combined = TABIX_BGZIPTABIX_INTERVAL_COMBINED.out.gz_tbi.map { _meta, intervals_gz, intervals_gz_tbi -> [intervals_gz, intervals_gz_tbi] }.collect()

emit:
// Intervals split for parallel execution
intervals_bed // [ intervals.bed, num_intervals ]
intervals_bed_gz_tbi // [ intervals.bed.gz, intervals.bed.gz.tbi, num_intervals ]
// All intervals in one file
intervals_bed_combined // [ intervals.bed ]
intervals_bed_gz_tbi_combined // [ intervals.bed.gz, intervals.bed.gz.tbi]

versions // [ versions.yml ]
versions // [ versions.yml ]
}
377 changes: 201 additions & 176 deletions subworkflows/local/samplesheet_to_channel/main.nf

Large diffs are not rendered by default.

593 changes: 325 additions & 268 deletions workflows/sarek/main.nf

Large diffs are not rendered by default.