-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassemble.nf
180 lines (134 loc) · 4.44 KB
/
assemble.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// ------- Parameters definition -------
// run name
params.run = "test"
params.input_fastq = "runs/${params.run}/reads/*.fastq.gz"
params.output_dir = file("runs/${params.run}/clustering")
params.n_threads = 16
// ------- processes -------
// pre-filtering step with fitlong
process filtlong {
label 'q30m'
input:
path fastq_file
output:
tuple val("${fastq_file.getSimpleName()}"), file ("reads.fastq")
script:
"""
filtlong --min_length 1000 --keep_percent 95 $fastq_file > reads.fastq
"""
}
// subsamble the reads in 12 samples
process subsampler {
label 'q6h_subsample'
errorStrategy 'ignore'
input:
tuple val(sample_id), file("reads.fastq")
output:
tuple val(sample_id), file("${sample_id}/sample_*.fastq") optional true
script:
"""
trycycler subsample \
--reads reads.fastq \
--out_dir ${sample_id} \
--threads $params.n_threads
"""
}
// assemble with flye
process assemble_flye {
label 'q30m'
input:
tuple val(sample_id), val(sample_num), file(fq)
output:
tuple val(sample_id), file("assembly/assembly.fasta")
script:
"""
flye --nano-raw $fq \
--threads $params.n_threads \
--out-dir assembly
"""
}
// assemble with raven
process assemble_raven {
label 'q30m'
input:
tuple val(sample_id), val(sample_num), file(fq)
output:
tuple val(sample_id), file("assembly.fasta")
script:
"""
raven --threads $params.n_threads $fq > assembly.fasta
rm raven.cereal
"""
}
// assemble with miniasm and minipolish
process assemble_mini {
label 'q30m'
errorStrategy 'ignore'
input:
tuple val(sample_id), val(sample_num), file(fq)
output:
tuple val(sample_id), file("assembly.fasta")
script:
"""
minimap2 -x ava-ont -t $params.n_threads $fq $fq > overlaps.paf
miniasm -f $fq overlaps.paf > unpolished_assembly.gfa
minipolish --threads $params.n_threads $fq unpolished_assembly.gfa > assembly.gfa
any2fasta assembly.gfa > assembly.fasta
rm overlaps.paf unpolished_assembly.gfa
rm assembly.gfa
"""
}
// trycicler cluster. Takes as input the assembly files for each sample_id,
// along with the fastq reads. Resulting clusters are saved in the
// clustering/sample_id` folder for further inspection.
process trycycler_cluster {
label 'q6h_2h'
publishDir params.output_dir, mode: 'copy'
input:
tuple val(sample_id), file("assemblies_*.fasta"), file("reads.fastq")
output:
file("$sample_id")
script:
"""
trycycler cluster \
--reads reads.fastq \
--assemblies assemblies_*.fasta \
--out_dir $sample_id
cp reads.fastq $sample_id/filtlong_reads.fastq
"""
}
// -------- workflow -----------
// capture input
channel.fromPath( params.input_fastq )
.ifEmpty { error "Cannot find any fasta files matching: ${params.input_fastq}" }
.set { input_samples }
workflow {
// filter and subsample reads
filtlong(input_samples)
subsampler(filtlong.out)
// turn the output pipe, in which items are in format [sample_id, [samples...]]
// into single items in the format [sample_id, sample number, file].
// these samples are then sent into the three different subchannels destined
// to different assemblers (flye, raven, minipolish)
to_assemble = subsampler.out
.transpose() // pair sample_id to each file
.map { it -> [it[0], it[1].getSimpleName().find(/(?<=^sample_)\d+$/).toInteger(), it[1]] }
.branch {
flye : it[1] <= 4
raven : it[1] >= 9
mini : true
}
// assemble each set with a different assembler
assemble_flye(to_assemble.flye)
assemble_raven(to_assemble.raven)
assemble_mini(to_assemble.mini)
// collect all the assembled files. Items are collected in format
// [sample_id, assembly.fasta] and are grouped by sample_ids in chunks of size 12
// [sample_id, [assembly_01.fasta ... assembly_12.fasta]] to be sent to
// trycycler cluster
assembled = assemble_flye.out.mix(assemble_raven.out, assemble_mini.out)
.groupTuple(size: 12)
.join(filtlong.out)
// cluster results with trycycler and export results
trycycler_cluster(assembled)
}